diff --git a/app/grpc/tx/server.go b/app/grpc/tx/server.go index c6e8229a0b..4e319468b5 100644 --- a/app/grpc/tx/server.go +++ b/app/grpc/tx/server.go @@ -90,3 +90,63 @@ func (s *txServer) TxStatus(ctx context.Context, req *TxStatusRequest) (*TxStatu Signers: resTx.Signers, }, nil } + +func (s *txServer) TxStatusBatch(ctx context.Context, req *TxStatusBatchRequest) (*TxStatusBatchResponse, error) { + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request cannot be nil") + } + + if len(req.TxIds) > 20 { + return nil, status.Error(codes.InvalidArgument, "maximum of 20 tx ids allowed") + } + + if len(req.TxIds) == 0 { + return nil, status.Error(codes.InvalidArgument, "tx ids cannot be empty") + } + + node, err := s.clientCtx.GetNode() + if err != nil { + return nil, err + } + + nodeTxStatus, ok := node.(rpcclient.SignClient) + if !ok { + return nil, status.Error(codes.Unimplemented, "node does not support tx status batch") + } + + txIDs := make([][]byte, len(req.TxIds)) + for i, txId := range req.TxIds { + txID, err := hex.DecodeString(txId) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid tx id: %s", err) + } + txIDs[i] = txID + } + + txStatusBatchResponses, err := nodeTxStatus.TxStatusBatch(ctx, txIDs) + if err != nil { + return nil, err + } + + responses := make([]*TxStatusResult, len(txStatusBatchResponses.Statuses)) + for i, status := range txStatusBatchResponses.Statuses { + responses[i] = &TxStatusResult{ + TxHash: req.TxIds[i], + Status: &TxStatusResponse{ + Height: status.Result.Height, + Index: status.Result.Index, + ExecutionCode: status.Result.ExecutionCode, + Error: status.Result.Error, + Status: status.Result.Status, + GasWanted: status.Result.GasWanted, + GasUsed: status.Result.GasUsed, + Codespace: status.Result.Codespace, + Signers: status.Result.Signers, + }, + } + } + + return &TxStatusBatchResponse{ + Statuses: responses, + }, nil +} diff --git a/app/grpc/tx/tx.pb.go b/app/grpc/tx/tx.pb.go index 542cef60b3..795f61d790 100644 --- a/app/grpc/tx/tx.pb.go +++ b/app/grpc/tx/tx.pb.go @@ -6,15 +6,16 @@ package tx import ( context "context" fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + grpc1 "github.com/cosmos/gogoproto/grpc" proto "github.com/cosmos/gogoproto/proto" _ "google.golang.org/genproto/googleapis/api/annotations" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - io "io" - math "math" - math_bits "math/bits" ) // Reference imports to suppress errors if they are not otherwise used. @@ -86,12 +87,13 @@ type TxStatusResponse struct { Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` // status is the status of the transaction. Status string `protobuf:"bytes,5,opt,name=status,proto3" json:"status,omitempty"` - // the error namespace/module + // the error namespace/module. Codespace string `protobuf:"bytes,6,opt,name=codespace,proto3" json:"codespace,omitempty"` - // requested gas limit + // requested gas limit. GasWanted int64 `protobuf:"varint,7,opt,name=gas_wanted,json=gasWanted,proto3" json:"gas_wanted,omitempty"` - // actual gas consumed - GasUsed int64 `protobuf:"varint,8,opt,name=gas_used,json=gasUsed,proto3" json:"gas_used,omitempty"` + // actual gas consumed. + GasUsed int64 `protobuf:"varint,8,opt,name=gas_used,json=gasUsed,proto3" json:"gas_used,omitempty"` + // signers of the transaction. Signers []string `protobuf:"bytes,9,rep,name=signers,proto3" json:"signers,omitempty"` } @@ -191,41 +193,196 @@ func (m *TxStatusResponse) GetSigners() []string { return nil } +// TxStatusBatchRequest is the request type for the batch TxStatus gRPC method. +type TxStatusBatchRequest struct { + // array of hex encoded tx hashes (each hash should be 64 characters long representing 32 bytes) + TxIds []string `protobuf:"bytes,1,rep,name=tx_ids,json=txIds,proto3" json:"tx_ids,omitempty"` +} + +func (m *TxStatusBatchRequest) Reset() { *m = TxStatusBatchRequest{} } +func (m *TxStatusBatchRequest) String() string { return proto.CompactTextString(m) } +func (*TxStatusBatchRequest) ProtoMessage() {} +func (*TxStatusBatchRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_7d8b070565b0dcb6, []int{2} +} +func (m *TxStatusBatchRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TxStatusBatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TxStatusBatchRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TxStatusBatchRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TxStatusBatchRequest.Merge(m, src) +} +func (m *TxStatusBatchRequest) XXX_Size() int { + return m.Size() +} +func (m *TxStatusBatchRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TxStatusBatchRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TxStatusBatchRequest proto.InternalMessageInfo + +func (m *TxStatusBatchRequest) GetTxIds() []string { + if m != nil { + return m.TxIds + } + return nil +} + +// TxStatusResult represents a single transaction status result in a batch response. +type TxStatusResult struct { + TxHash string `protobuf:"bytes,1,opt,name=tx_hash,json=txHash,proto3" json:"tx_hash,omitempty"` + Status *TxStatusResponse `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` +} + +func (m *TxStatusResult) Reset() { *m = TxStatusResult{} } +func (m *TxStatusResult) String() string { return proto.CompactTextString(m) } +func (*TxStatusResult) ProtoMessage() {} +func (*TxStatusResult) Descriptor() ([]byte, []int) { + return fileDescriptor_7d8b070565b0dcb6, []int{3} +} +func (m *TxStatusResult) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TxStatusResult) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TxStatusResult.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TxStatusResult) XXX_Merge(src proto.Message) { + xxx_messageInfo_TxStatusResult.Merge(m, src) +} +func (m *TxStatusResult) XXX_Size() int { + return m.Size() +} +func (m *TxStatusResult) XXX_DiscardUnknown() { + xxx_messageInfo_TxStatusResult.DiscardUnknown(m) +} + +var xxx_messageInfo_TxStatusResult proto.InternalMessageInfo + +func (m *TxStatusResult) GetTxHash() string { + if m != nil { + return m.TxHash + } + return "" +} + +func (m *TxStatusResult) GetStatus() *TxStatusResponse { + if m != nil { + return m.Status + } + return nil +} + +// TxStatusBatchResponse is a response type for batched TxStatus query. It contains an array of transaction status results. +type TxStatusBatchResponse struct { + Statuses []*TxStatusResult `protobuf:"bytes,1,rep,name=statuses,proto3" json:"statuses,omitempty"` +} + +func (m *TxStatusBatchResponse) Reset() { *m = TxStatusBatchResponse{} } +func (m *TxStatusBatchResponse) String() string { return proto.CompactTextString(m) } +func (*TxStatusBatchResponse) ProtoMessage() {} +func (*TxStatusBatchResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_7d8b070565b0dcb6, []int{4} +} +func (m *TxStatusBatchResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TxStatusBatchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TxStatusBatchResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TxStatusBatchResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TxStatusBatchResponse.Merge(m, src) +} +func (m *TxStatusBatchResponse) XXX_Size() int { + return m.Size() +} +func (m *TxStatusBatchResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TxStatusBatchResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TxStatusBatchResponse proto.InternalMessageInfo + +func (m *TxStatusBatchResponse) GetStatuses() []*TxStatusResult { + if m != nil { + return m.Statuses + } + return nil +} + func init() { proto.RegisterType((*TxStatusRequest)(nil), "celestia.core.v1.tx.TxStatusRequest") proto.RegisterType((*TxStatusResponse)(nil), "celestia.core.v1.tx.TxStatusResponse") + proto.RegisterType((*TxStatusBatchRequest)(nil), "celestia.core.v1.tx.TxStatusBatchRequest") + proto.RegisterType((*TxStatusResult)(nil), "celestia.core.v1.tx.TxStatusResult") + proto.RegisterType((*TxStatusBatchResponse)(nil), "celestia.core.v1.tx.TxStatusBatchResponse") } func init() { proto.RegisterFile("celestia/core/v1/tx/tx.proto", fileDescriptor_7d8b070565b0dcb6) } var fileDescriptor_7d8b070565b0dcb6 = []byte{ - // 404 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x52, 0x4d, 0x8b, 0x14, 0x31, - 0x10, 0x9d, 0xee, 0xd9, 0xf9, 0xe8, 0xc0, 0xaa, 0x64, 0x45, 0xe2, 0xd2, 0x36, 0xc3, 0xb0, 0x2b, - 0x73, 0xb1, 0xc3, 0xea, 0x3f, 0xd0, 0xd3, 0x5e, 0xdb, 0x15, 0xc1, 0xcb, 0x90, 0xed, 0x14, 0x99, - 0xc0, 0xda, 0x69, 0x93, 0xea, 0x35, 0x20, 0x7b, 0xd1, 0x83, 0x57, 0xc1, 0x3f, 0xe5, 0x71, 0xc0, - 0x8b, 0x47, 0x99, 0xf1, 0x87, 0x48, 0xa7, 0xe7, 0x03, 0x64, 0xc0, 0x43, 0x20, 0xef, 0xbd, 0xaa, - 0x7a, 0xe1, 0x55, 0x48, 0x5a, 0xc2, 0x0d, 0x38, 0xd4, 0x82, 0x97, 0xc6, 0x02, 0xbf, 0xbd, 0xe0, - 0xe8, 0x39, 0xfa, 0xbc, 0xb6, 0x06, 0x0d, 0x3d, 0xd9, 0xaa, 0x79, 0xab, 0xe6, 0xb7, 0x17, 0x39, - 0xfa, 0xd3, 0x54, 0x19, 0xa3, 0x6e, 0x80, 0x8b, 0x5a, 0x73, 0x51, 0x55, 0x06, 0x05, 0x6a, 0x53, - 0xb9, 0xae, 0x65, 0xfa, 0x94, 0xdc, 0xbf, 0xf2, 0xaf, 0x51, 0x60, 0xe3, 0x0a, 0xf8, 0xd0, 0x80, - 0x43, 0x7a, 0x42, 0x06, 0xe8, 0xe7, 0x5a, 0xb2, 0x68, 0x12, 0xcd, 0x92, 0xe2, 0x08, 0xfd, 0xa5, - 0x9c, 0x7e, 0x8d, 0xc9, 0x83, 0x7d, 0xa1, 0xab, 0x4d, 0xe5, 0x80, 0x3e, 0x22, 0xc3, 0x05, 0x68, - 0xb5, 0xc0, 0x50, 0xda, 0x2f, 0x36, 0x88, 0x3e, 0x24, 0x03, 0x5d, 0x49, 0xf0, 0x2c, 0x9e, 0x44, - 0xb3, 0xe3, 0xa2, 0x03, 0xf4, 0x9c, 0xdc, 0x03, 0x0f, 0x65, 0xd3, 0xda, 0xcf, 0x4b, 0x23, 0x81, - 0xf5, 0x83, 0x7c, 0xbc, 0x63, 0x5f, 0x19, 0x09, 0x6d, 0x33, 0x58, 0x6b, 0x2c, 0x3b, 0x0a, 0xf6, - 0x1d, 0x68, 0xad, 0x5c, 0x30, 0x67, 0x83, 0x40, 0x6f, 0x10, 0x4d, 0x49, 0xd2, 0x8e, 0x72, 0xb5, - 0x28, 0x81, 0x0d, 0x83, 0xb4, 0x27, 0xe8, 0x13, 0x42, 0x94, 0x70, 0xf3, 0x8f, 0xa2, 0x42, 0x90, - 0x6c, 0x14, 0x1e, 0x99, 0x28, 0xe1, 0xde, 0x06, 0x82, 0x3e, 0x26, 0xe3, 0x56, 0x6e, 0x1c, 0x48, - 0x36, 0x0e, 0xe2, 0x48, 0x09, 0xf7, 0xc6, 0x81, 0xa4, 0x8c, 0x8c, 0x9c, 0x56, 0x15, 0x58, 0xc7, - 0x92, 0x49, 0x7f, 0x96, 0x14, 0x5b, 0xf8, 0xfc, 0x4b, 0x44, 0xe2, 0x2b, 0x4f, 0xef, 0xc8, 0x78, - 0x9b, 0x07, 0x3d, 0xcb, 0x0f, 0x04, 0x9f, 0xff, 0x93, 0xeb, 0xe9, 0xf9, 0x7f, 0xaa, 0xba, 0x50, - 0xa7, 0x67, 0x9f, 0x7f, 0xfe, 0xf9, 0x1e, 0x67, 0x34, 0xe5, 0x87, 0x76, 0xfd, 0x29, 0xac, 0xe6, - 0xee, 0xe5, 0xe5, 0x8f, 0x55, 0x16, 0x2d, 0x57, 0x59, 0xf4, 0x7b, 0x95, 0x45, 0xdf, 0xd6, 0x59, - 0x6f, 0xb9, 0xce, 0x7a, 0xbf, 0xd6, 0x59, 0xef, 0x1d, 0x57, 0x1a, 0x17, 0xcd, 0x75, 0x5e, 0x9a, - 0xf7, 0xbb, 0x09, 0xc6, 0xaa, 0xdd, 0xfd, 0x99, 0xa8, 0x6b, 0xde, 0x1e, 0x65, 0xeb, 0x92, 0xa3, - 0xbf, 0x1e, 0x86, 0x9f, 0xf0, 0xe2, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x71, 0xcb, 0xde, 0x23, - 0x5c, 0x02, 0x00, 0x00, + // 530 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0x4f, 0x6f, 0xd3, 0x4e, + 0x10, 0x8d, 0x93, 0xe6, 0xdf, 0x54, 0xe9, 0xef, 0xa7, 0x6d, 0x0b, 0x4b, 0x14, 0x4c, 0x64, 0x1a, + 0x14, 0x2a, 0xd5, 0x56, 0xc3, 0x0d, 0x09, 0x21, 0x95, 0x0b, 0xbd, 0x9a, 0x22, 0x10, 0x97, 0x68, + 0x63, 0xaf, 0x6c, 0x4b, 0xc1, 0x6b, 0xbc, 0xe3, 0xb2, 0x12, 0xea, 0x85, 0x0b, 0x27, 0x24, 0xa4, + 0x7e, 0x29, 0x8e, 0x95, 0xb8, 0x70, 0x44, 0x09, 0x1f, 0x04, 0xed, 0x26, 0x4e, 0x4b, 0x15, 0xd1, + 0x1e, 0x2c, 0x79, 0xe6, 0xbd, 0x99, 0x79, 0xf3, 0xc6, 0x86, 0x5e, 0xc0, 0xa7, 0x5c, 0x62, 0xc2, + 0xbc, 0x40, 0xe4, 0xdc, 0x3b, 0x3d, 0xf4, 0x50, 0x79, 0xa8, 0xdc, 0x2c, 0x17, 0x28, 0xc8, 0x76, + 0x89, 0xba, 0x1a, 0x75, 0x4f, 0x0f, 0x5d, 0x54, 0xdd, 0x5e, 0x24, 0x44, 0x34, 0xe5, 0x1e, 0xcb, + 0x12, 0x8f, 0xa5, 0xa9, 0x40, 0x86, 0x89, 0x48, 0xe5, 0xa2, 0xc4, 0x79, 0x04, 0xff, 0x9d, 0xa8, + 0x57, 0xc8, 0xb0, 0x90, 0x3e, 0xff, 0x50, 0x70, 0x89, 0x64, 0x1b, 0xea, 0xa8, 0xc6, 0x49, 0x48, + 0xad, 0xbe, 0x35, 0x6c, 0xfb, 0x1b, 0xa8, 0x8e, 0x43, 0xe7, 0x4b, 0x15, 0xfe, 0xbf, 0x24, 0xca, + 0x4c, 0xa4, 0x92, 0x93, 0x3b, 0xd0, 0x88, 0x79, 0x12, 0xc5, 0x68, 0xa8, 0x35, 0x7f, 0x19, 0x91, + 0x1d, 0xa8, 0x27, 0x69, 0xc8, 0x15, 0xad, 0xf6, 0xad, 0x61, 0xc7, 0x5f, 0x04, 0x64, 0x00, 0x5b, + 0x5c, 0xf1, 0xa0, 0xd0, 0xe3, 0xc7, 0x81, 0x08, 0x39, 0xad, 0x19, 0xb8, 0xb3, 0xca, 0xbe, 0x10, + 0x21, 0xd7, 0xc5, 0x3c, 0xcf, 0x45, 0x4e, 0x37, 0xcc, 0xf8, 0x45, 0xa0, 0x47, 0x49, 0x33, 0x9c, + 0xd6, 0x4d, 0x7a, 0x19, 0x91, 0x1e, 0xb4, 0x75, 0x2b, 0x99, 0xb1, 0x80, 0xd3, 0x86, 0x81, 0x2e, + 0x13, 0xe4, 0x3e, 0x40, 0xc4, 0xe4, 0xf8, 0x23, 0x4b, 0x91, 0x87, 0xb4, 0x69, 0x44, 0xb6, 0x23, + 0x26, 0xdf, 0x98, 0x04, 0xb9, 0x07, 0x2d, 0x0d, 0x17, 0x92, 0x87, 0xb4, 0x65, 0xc0, 0x66, 0xc4, + 0xe4, 0x6b, 0xc9, 0x43, 0x42, 0xa1, 0x29, 0x93, 0x28, 0xe5, 0xb9, 0xa4, 0xed, 0x7e, 0x6d, 0xd8, + 0xf6, 0xcb, 0xd0, 0x39, 0x80, 0x9d, 0xd2, 0x88, 0x23, 0x86, 0x41, 0x5c, 0xda, 0xb6, 0x0b, 0x0d, + 0x63, 0x9b, 0xa4, 0x96, 0x29, 0xa8, 0x6b, 0xdf, 0xa4, 0x13, 0xc3, 0xd6, 0x15, 0xdf, 0x8a, 0x29, + 0x92, 0xbb, 0xd0, 0x44, 0x35, 0x8e, 0x99, 0x8c, 0x97, 0x0e, 0x37, 0x50, 0xbd, 0x64, 0x32, 0x26, + 0xcf, 0x56, 0x3b, 0x6a, 0xdf, 0x36, 0x47, 0x03, 0x77, 0xcd, 0x3d, 0xdd, 0xeb, 0x57, 0x28, 0xad, + 0x70, 0xde, 0xc2, 0xee, 0x35, 0x61, 0xcb, 0x33, 0x3d, 0x87, 0xd6, 0x82, 0xc2, 0x17, 0xda, 0x36, + 0x47, 0x0f, 0x6f, 0xea, 0x5c, 0x4c, 0xd1, 0x5f, 0x15, 0x8d, 0xce, 0xab, 0x50, 0x3d, 0x51, 0xe4, + 0x0c, 0x5a, 0x25, 0x85, 0xec, 0xdd, 0xd0, 0xc1, 0x78, 0xd2, 0xbd, 0xdd, 0x06, 0xce, 0xde, 0xe7, + 0x1f, 0xbf, 0xcf, 0xab, 0x36, 0xe9, 0x79, 0xeb, 0x3e, 0xef, 0x4f, 0xc6, 0xd6, 0x33, 0xf2, 0xd5, + 0x82, 0xce, 0x5f, 0x0b, 0x92, 0xc7, 0xff, 0x6c, 0x7f, 0xf5, 0x3a, 0xdd, 0xfd, 0xdb, 0x50, 0x97, + 0x72, 0x06, 0x46, 0xce, 0x83, 0xa7, 0xd6, 0xbe, 0xd3, 0x5d, 0xab, 0x68, 0xa2, 0xe9, 0x47, 0xc7, + 0xdf, 0x67, 0xb6, 0x75, 0x31, 0xb3, 0xad, 0x5f, 0x33, 0xdb, 0xfa, 0x36, 0xb7, 0x2b, 0x17, 0x73, + 0xbb, 0xf2, 0x73, 0x6e, 0x57, 0xde, 0x79, 0x51, 0x82, 0x71, 0x31, 0x71, 0x03, 0xf1, 0x7e, 0x55, + 0x2f, 0xf2, 0x68, 0xf5, 0x7e, 0xc0, 0xb2, 0xcc, 0xd3, 0x4f, 0x94, 0x67, 0x81, 0x87, 0x6a, 0xd2, + 0x30, 0x3f, 0xe3, 0x93, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xf9, 0xc7, 0x7f, 0xa8, 0xdf, 0x03, + 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -247,6 +404,8 @@ type TxClient interface { // - Evicted // - Unknown TxStatus(ctx context.Context, in *TxStatusRequest, opts ...grpc.CallOption) (*TxStatusResponse, error) + // TxStatusBatch for batch queries + TxStatusBatch(ctx context.Context, in *TxStatusBatchRequest, opts ...grpc.CallOption) (*TxStatusBatchResponse, error) } type txClient struct { @@ -266,6 +425,16 @@ func (c *txClient) TxStatus(ctx context.Context, in *TxStatusRequest, opts ...gr return out, nil } +func (c *txClient) TxStatusBatch(ctx context.Context, in *TxStatusBatchRequest, opts ...grpc.CallOption) (*TxStatusBatchResponse, error) { + out := new(TxStatusBatchResponse) + err := c.cc.Invoke(ctx, "/celestia.core.v1.tx.Tx/TxStatusBatch", in, out, opts...) + if err != nil { + fmt.Println("TxStatusBatch error: ", err) + return nil, err + } + return out, nil +} + // TxServer is the server API for Tx service. type TxServer interface { // TxStatus returns the status of a transaction. There are four possible @@ -275,6 +444,8 @@ type TxServer interface { // - Evicted // - Unknown TxStatus(context.Context, *TxStatusRequest) (*TxStatusResponse, error) + // TxStatusBatch for batch queries + TxStatusBatch(context.Context, *TxStatusBatchRequest) (*TxStatusBatchResponse, error) } // UnimplementedTxServer can be embedded to have forward compatible implementations. @@ -284,6 +455,9 @@ type UnimplementedTxServer struct { func (*UnimplementedTxServer) TxStatus(ctx context.Context, req *TxStatusRequest) (*TxStatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method TxStatus not implemented") } +func (*UnimplementedTxServer) TxStatusBatch(ctx context.Context, req *TxStatusBatchRequest) (*TxStatusBatchResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method TxStatusBatch not implemented") +} func RegisterTxServer(s grpc1.Server, srv TxServer) { s.RegisterService(&_Tx_serviceDesc, srv) @@ -307,6 +481,24 @@ func _Tx_TxStatus_Handler(srv interface{}, ctx context.Context, dec func(interfa return interceptor(ctx, in, info, handler) } +func _Tx_TxStatusBatch_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TxStatusBatchRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TxServer).TxStatusBatch(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/celestia.core.v1.tx.Tx/TxStatusBatch", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TxServer).TxStatusBatch(ctx, req.(*TxStatusBatchRequest)) + } + return interceptor(ctx, in, info, handler) +} + var Tx_serviceDesc = _Tx_serviceDesc var _Tx_serviceDesc = grpc.ServiceDesc{ ServiceName: "celestia.core.v1.tx.Tx", @@ -316,6 +508,10 @@ var _Tx_serviceDesc = grpc.ServiceDesc{ MethodName: "TxStatus", Handler: _Tx_TxStatus_Handler, }, + { + MethodName: "TxStatusBatch", + Handler: _Tx_TxStatusBatch_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "celestia/core/v1/tx/tx.proto", @@ -429,6 +625,117 @@ func (m *TxStatusResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *TxStatusBatchRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TxStatusBatchRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TxStatusBatchRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.TxIds) > 0 { + for iNdEx := len(m.TxIds) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.TxIds[iNdEx]) + copy(dAtA[i:], m.TxIds[iNdEx]) + i = encodeVarintTx(dAtA, i, uint64(len(m.TxIds[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *TxStatusResult) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TxStatusResult) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TxStatusResult) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Status != nil { + { + size, err := m.Status.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTx(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if len(m.TxHash) > 0 { + i -= len(m.TxHash) + copy(dAtA[i:], m.TxHash) + i = encodeVarintTx(dAtA, i, uint64(len(m.TxHash))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *TxStatusBatchResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TxStatusBatchResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TxStatusBatchResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Statuses) > 0 { + for iNdEx := len(m.Statuses) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Statuses[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTx(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func encodeVarintTx(dAtA []byte, offset int, v uint64) int { offset -= sovTx(v) base := offset @@ -495,6 +802,53 @@ func (m *TxStatusResponse) Size() (n int) { return n } +func (m *TxStatusBatchRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.TxIds) > 0 { + for _, s := range m.TxIds { + l = len(s) + n += 1 + l + sovTx(uint64(l)) + } + } + return n +} + +func (m *TxStatusResult) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TxHash) + if l > 0 { + n += 1 + l + sovTx(uint64(l)) + } + if m.Status != nil { + l = m.Status.Size() + n += 1 + l + sovTx(uint64(l)) + } + return n +} + +func (m *TxStatusBatchResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Statuses) > 0 { + for _, e := range m.Statuses { + l = e.Size() + n += 1 + l + sovTx(uint64(l)) + } + } + return n +} + func sovTx(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -856,6 +1210,290 @@ func (m *TxStatusResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *TxStatusBatchRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTx + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TxStatusBatchRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TxStatusBatchRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TxIds", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTx + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTx + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTx + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TxIds = append(m.TxIds, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTx(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTx + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TxStatusResult) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTx + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TxStatusResult: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TxStatusResult: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TxHash", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTx + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTx + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTx + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TxHash = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTx + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTx + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTx + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Status == nil { + m.Status = &TxStatusResponse{} + } + if err := m.Status.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTx(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTx + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TxStatusBatchResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTx + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TxStatusBatchResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TxStatusBatchResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Statuses", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTx + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTx + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTx + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Statuses = append(m.Statuses, &TxStatusResult{}) + if err := m.Statuses[len(m.Statuses)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTx(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTx + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipTx(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/app/grpc/tx/tx.pb.gw.go b/app/grpc/tx/tx.pb.gw.go index 8cec56bc0a..614c9efc10 100644 --- a/app/grpc/tx/tx.pb.gw.go +++ b/app/grpc/tx/tx.pb.gw.go @@ -87,6 +87,40 @@ func local_request_Tx_TxStatus_0(ctx context.Context, marshaler runtime.Marshale } +func request_Tx_TxStatusBatch_0(ctx context.Context, marshaler runtime.Marshaler, client TxClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq TxStatusBatchRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := client.TxStatusBatch(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Tx_TxStatusBatch_0(ctx context.Context, marshaler runtime.Marshaler, server TxServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq TxStatusBatchRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := server.TxStatusBatch(ctx, &protoReq) + return msg, metadata, err + +} + // RegisterTxHandlerServer registers the http handlers for service Tx to "mux". // UnaryRPC :call TxServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. @@ -116,6 +150,29 @@ func RegisterTxHandlerServer(ctx context.Context, mux *runtime.ServeMux, server }) + mux.Handle("POST", pattern_Tx_TxStatusBatch_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Tx_TxStatusBatch_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Tx_TxStatusBatch_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -177,13 +234,37 @@ func RegisterTxHandlerClient(ctx context.Context, mux *runtime.ServeMux, client }) + mux.Handle("POST", pattern_Tx_TxStatusBatch_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Tx_TxStatusBatch_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Tx_TxStatusBatch_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } var ( pattern_Tx_TxStatus_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3, 1, 0, 4, 1, 5, 4}, []string{"celestia", "core", "v1", "tx", "tx_id"}, "", runtime.AssumeColonVerbOpt(false))) + + pattern_Tx_TxStatusBatch_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3, 2, 4}, []string{"celestia", "core", "v1", "tx", "batch"}, "", runtime.AssumeColonVerbOpt(false))) ) var ( forward_Tx_TxStatus_0 = runtime.ForwardResponseMessage + + forward_Tx_TxStatusBatch_0 = runtime.ForwardResponseMessage ) diff --git a/latency_results.csv b/latency_results.csv new file mode 100644 index 0000000000..f6d984805e --- /dev/null +++ b/latency_results.csv @@ -0,0 +1 @@ +Submit Time,Commit Time,Latency (ms),Tx Hash,Height,Code,Failed,Error diff --git a/pkg/user/pruning_test.go b/pkg/user/pruning_test.go index 7385f561f3..4b50cf54ab 100644 --- a/pkg/user/pruning_test.go +++ b/pkg/user/pruning_test.go @@ -3,14 +3,13 @@ package user import ( "fmt" "testing" - "time" "github.com/stretchr/testify/require" ) func TestPruningInTxTracker(t *testing.T) { txClient := &TxClient{ - txTracker: make(map[string]txInfo), + TxTracker: NewTxTracker(), } numTransactions := 10 @@ -20,31 +19,21 @@ func TestPruningInTxTracker(t *testing.T) { for i := range numTransactions { // 5 transactions will be pruned if i%2 == 0 { - txClient.txTracker["tx"+fmt.Sprint(i)] = txInfo{ - signer: "signer" + fmt.Sprint(i), - sequence: uint64(i), - timestamp: time.Now(). - Add(-10 * time.Minute), - } + txClient.TxTracker.trackTransaction("signer"+fmt.Sprint(i), uint64(i), "tx"+fmt.Sprint(i), []byte(fmt.Sprintf("tx%d", i))) txsToBePruned++ } else { - txClient.txTracker["tx"+fmt.Sprint(i)] = txInfo{ - signer: "signer" + fmt.Sprint(i), - sequence: uint64(i), - timestamp: time.Now(). - Add(-5 * time.Minute), - } + txClient.TxTracker.trackTransaction("signer"+fmt.Sprint(i), uint64(i), "tx"+fmt.Sprint(i), []byte(fmt.Sprintf("tx%d", i))) txsNotReadyToBePruned++ } } - txTrackerBeforePruning := len(txClient.txTracker) + txTrackerBeforePruning := len(txClient.TxTracker.TxQueue) // All transactions were indexed - require.Equal(t, numTransactions, len(txClient.txTracker)) - txClient.pruneTxTracker() + require.Equal(t, numTransactions, len(txClient.TxTracker.TxQueue)) + txClient.TxTracker.pruneTxTracker() // Prunes the transactions that are 10 minutes old // 5 transactions will be pruned require.Equal(t, txsNotReadyToBePruned, txTrackerBeforePruning-txsToBePruned) - require.Equal(t, len(txClient.txTracker), txsNotReadyToBePruned) + require.Equal(t, len(txClient.TxTracker.TxQueue), txsNotReadyToBePruned) } diff --git a/pkg/user/tx_client.go b/pkg/user/tx_client.go index 577529eb30..fa89bc1481 100644 --- a/pkg/user/tx_client.go +++ b/pkg/user/tx_client.go @@ -31,7 +31,8 @@ import ( codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdktypes "github.com/cosmos/cosmos-sdk/types" - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" + + // sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" sdktx "github.com/cosmos/cosmos-sdk/types/tx" paramtypes "github.com/cosmos/cosmos-sdk/x/params/types/proposal" "go.opentelemetry.io/otel/attribute" @@ -58,15 +59,6 @@ var ( type Option func(client *TxClient) -// txInfo is a struct that holds the sequence and the signer of a transaction -// in the local tx pool. -type txInfo struct { - sequence uint64 - signer string - timestamp time.Time - txBytes []byte -} - // TxResponse is a response from the chain after // a transaction has been submitted. type TxResponse struct { @@ -164,9 +156,9 @@ func WithDefaultAccount(name string) Option { c.defaultAddress = addr // Update worker 0's account if tx queue already exists - if c.txQueue != nil && len(c.txQueue.workers) > 0 { - c.txQueue.workers[0].accountName = name - c.txQueue.workers[0].address = addr.String() + if c.parallelTxQueue != nil && len(c.parallelTxQueue.workers) > 0 { + c.parallelTxQueue.workers[0].accountName = name + c.parallelTxQueue.workers[0].address = addr.String() } } } @@ -199,7 +191,7 @@ func WithTxWorkers(numWorkers int) Option { } return func(c *TxClient) { - c.txQueue = newTxQueue(c, numWorkers) + c.parallelTxQueue = newTxQueue(c, numWorkers) } } @@ -207,8 +199,8 @@ func WithTxWorkers(numWorkers int) Option { // Default is 100 if not specified. func WithParallelQueueSize(size int) Option { return func(c *TxClient) { - if c.txQueue != nil { - c.txQueue.jobQueue = make(chan *SubmissionJob, size) + if c.parallelTxQueue != nil { + c.parallelTxQueue.jobQueue = make(chan *SubmissionJob, size) } } } @@ -230,10 +222,10 @@ type TxClient struct { defaultAddress sdktypes.AccAddress // txTracker maps the tx hash to the Sequence and signer of the transaction // that was submitted to the chain - txTracker map[string]txInfo + TxTracker *TxTracker gasEstimationClient gasestimation.GasEstimatorClient - // txQueue manages parallel transaction submission when enabled - txQueue *txQueue + // parallelTxQueue manages parallel transaction submission when enabled + parallelTxQueue *txQueue } // NewTxClient returns a new TxClient @@ -265,7 +257,7 @@ func NewTxClient( pollTime: DefaultPollTime, defaultAccount: records[0].Name, defaultAddress: addr, - txTracker: make(map[string]txInfo), + TxTracker: NewTxTracker(), cdc: cdc, gasEstimationClient: gasestimation.NewGasEstimatorClient(conn), } @@ -276,8 +268,8 @@ func NewTxClient( // Always create a tx queue with at least 1 worker (the default account) // unless already configured by WithTxWorkers option - if txClient.txQueue == nil { - txClient.txQueue = newTxQueue(txClient, 1) + if txClient.parallelTxQueue == nil { + txClient.parallelTxQueue = newTxQueue(txClient, 1) } return txClient, nil @@ -335,7 +327,7 @@ func SetupTxClient( return nil, err } - if err := txClient.txQueue.start(ctx); err != nil { + if err := txClient.parallelTxQueue.start(ctx); err != nil { return nil, fmt.Errorf("failed to start tx queue: %w", err) } @@ -371,12 +363,12 @@ func (client *TxClient) SubmitPayForBlobToQueue(ctx context.Context, blobs []*sh // to the provided channel when the transaction is confirmed. The caller is responsible for creating and // closing the result channel. func (client *TxClient) QueueBlob(ctx context.Context, resultC chan SubmissionResult, blobs []*share.Blob, opts ...TxOption) { - if client.txQueue == nil { + if client.parallelTxQueue == nil { resultC <- SubmissionResult{Error: errTxQueueNotConfigured} return } - if !client.txQueue.isStarted() { + if !client.parallelTxQueue.isStarted() { resultC <- SubmissionResult{Error: errTxQueueNotStarted} return } @@ -388,13 +380,13 @@ func (client *TxClient) QueueBlob(ctx context.Context, resultC chan SubmissionRe ResultsC: resultC, } - client.txQueue.submitJob(job) + client.parallelTxQueue.submitJob(job) } // SubmitPayForBlobWithAccount forms a transaction from the provided blobs, signs it with the provided account, and submits it to the chain. // TxOptions may be provided to set the fee and gas limit. func (client *TxClient) SubmitPayForBlobWithAccount(ctx context.Context, accountName string, blobs []*share.Blob, opts ...TxOption) (*TxResponse, error) { - resp, err := client.BroadcastPayForBlobWithAccount(ctx, accountName, blobs, opts...) + resp, _, err := client.BroadcastPayForBlobWithAccount(ctx, accountName, blobs, opts...) if err != nil { return nil, err } @@ -413,23 +405,24 @@ func (client *TxClient) SubmitPayForBlobWithAccount(ctx context.Context, account // If no gas or gas price is set, it will estimate the gas and use // the max effective gas price: max(localMinGasPrice, networkMinGasPrice). func (client *TxClient) BroadcastPayForBlob(ctx context.Context, blobs []*share.Blob, opts ...TxOption) (*sdktypes.TxResponse, error) { - return client.BroadcastPayForBlobWithAccount(ctx, client.defaultAccount, blobs, opts...) + resp, _, err := client.BroadcastPayForBlobWithAccount(ctx, client.defaultAccount, blobs, opts...) + return resp, err } -func (client *TxClient) BroadcastPayForBlobWithAccount(ctx context.Context, accountName string, blobs []*share.Blob, opts ...TxOption) (*sdktypes.TxResponse, error) { +func (client *TxClient) BroadcastPayForBlobWithAccount(ctx context.Context, accountName string, blobs []*share.Blob, opts ...TxOption) (*sdktypes.TxResponse, []byte, error) { client.mtx.Lock() defer client.mtx.Unlock() if err := client.checkAccountLoaded(ctx, accountName); err != nil { - return nil, err + return nil, nil, err } acc, exists := client.signer.accounts[accountName] if !exists { - return nil, fmt.Errorf("account %s not found", accountName) + return nil, nil, fmt.Errorf("account %s not found", accountName) } signer := acc.Address().String() msg, err := blobtypes.NewMsgPayForBlobs(signer, 0, blobs...) if err != nil { - return nil, err + return nil, nil, err } gasLimit := blobtypes.DefaultEstimateGas(msg) fee := uint64(math.Ceil(appconsts.DefaultMinGasPrice * float64(gasLimit))) @@ -438,10 +431,15 @@ func (client *TxClient) BroadcastPayForBlobWithAccount(ctx context.Context, acco txBytes, _, err := client.signer.CreatePayForBlobs(accountName, blobs, opts...) if err != nil { - return nil, err + return nil, nil, err + } + + resp, err := client.routeTx(ctx, txBytes, accountName) + if err != nil { + return nil, nil, err } - return client.routeTx(ctx, txBytes, accountName) + return resp, txBytes, nil } // SubmitTx forms a transaction from the provided messages, signs it, and submits it to the chain. TxOptions @@ -462,7 +460,7 @@ func (client *TxClient) BroadcastTx(ctx context.Context, msgs []sdktypes.Msg, op // prune transactions that are older than 10 minutes // pruning has to be done in broadcast, since users // might not always call ConfirmTx(). - client.pruneTxTracker() + client.TxTracker.pruneTxTracker() account, err := client.getAccountNameFromMsgs(msgs) if err != nil { @@ -494,31 +492,33 @@ func (client *TxClient) BroadcastTx(ctx context.Context, msgs []sdktypes.Msg, op } gasLimit, err = client.estimateGas(ctx, txBuilder) if err != nil { - // If not a sequence mismatch, return the error. - if !strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { - return nil, err - } + return nil, err + } + // // If not a sequence mismatch, return the error. + // if !strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { + // return nil, err + // } - // Handle the sequence mismatch path by setting the sequence to the expected sequence - // and retrying gas estimation. - parsedErr := extractSequenceError(err.Error()) + // // Handle the sequence mismatch path by setting the sequence to the expected sequence + // // and retrying gas estimation. + // parsedErr := extractSequenceError(err.Error()) - expectedSequence, err := apperrors.ParseExpectedSequence(parsedErr) - if err != nil { - return nil, fmt.Errorf("parsing sequence mismatch: %w. RawLog: %s", err, err) - } + // expectedSequence, err := apperrors.ParseExpectedSequence(parsedErr) + // if err != nil { + // return nil, fmt.Errorf("parsing sequence mismatch: %w. RawLog: %s", err, err) + // } - if err = client.signer.SetSequence(account, expectedSequence); err != nil { - return nil, fmt.Errorf("setting sequence: %w", err) - } + // if err = client.signer.SetSequence(account, expectedSequence); err != nil { + // return nil, fmt.Errorf("setting sequence: %w", err) + // } - // Retry gas estimation with the corrected sequence. - gasLimit, err = client.estimateGas(ctx, txBuilder) - if err != nil { - return nil, fmt.Errorf("retrying gas estimation: %w", err) - } - } - txBuilder.SetGasLimit(gasLimit) + // // Retry gas estimation with the corrected sequence. + // gasLimit, err = client.estimateGas(ctx, txBuilder) + // if err != nil { + // return nil, fmt.Errorf("retrying gas estimation: %w", err) + // } + // } + // txBuilder.SetGasLimit(gasLimit) } if !hasUserSetFee { @@ -550,14 +550,14 @@ func (client *TxClient) routeTx(ctx context.Context, txBytes []byte, signer stri return client.submitToMultipleConnections(ctx, txBytes, signer) } span.AddEvent("txclient: broadcasting PFB to single endpoint") - return client.submitToSingleConnection(ctx, txBytes, signer) + return client.SubmitToSingleConnectionWithoutRetry(ctx, txBytes, signer) } // submitToSingleConnection handles submission to a single connection with retry logic at sequence mismatches and sequence management func (client *TxClient) submitToSingleConnection(ctx context.Context, txBytes []byte, signer string) (*sdktypes.TxResponse, error) { span := trace.SpanFromContext(ctx) - resp, err := client.sendTxToConnection(ctx, client.conns[0], txBytes) + resp, err := client.SendTxToConnection(ctx, client.conns[0], txBytes) if err != nil { broadcastTxErr, ok := err.(*BroadcastTxError) if !ok || !apperrors.IsNonceMismatchCode(broadcastTxErr.Code) { @@ -579,11 +579,33 @@ func (client *TxClient) submitToSingleConnection(ctx context.Context, txBytes [] } span.AddEvent("txclient/submitToSingleConnection: successfully rebroadcasted tx after sequence mismatch") - return client.submitToSingleConnection(ctx, retryTxBytes, signer) + return client.SubmitToSingleConnectionWithoutRetry(ctx, retryTxBytes, signer) } // Save the sequence, signer and txBytes of the in the local txTracker // before the sequence is incremented - client.trackTransaction(signer, resp.TxHash, txBytes) + sequence := client.signer.Account(signer).Sequence() + client.TxTracker.trackTransaction(signer, sequence, resp.TxHash, txBytes) + + // Increment sequence after successful submission + if err := client.signer.IncrementSequence(signer); err != nil { + return nil, fmt.Errorf("error incrementing sequence: %w", err) + } + + return resp, nil +} + +func (client *TxClient) SubmitToSingleConnectionWithoutRetry(ctx context.Context, txBytes []byte, signer string) (*sdktypes.TxResponse, error) { + // span := trace.SpanFromContext(ctx) + + resp, err := client.SendTxToConnection(ctx, client.conns[0], txBytes) + if err != nil { + return nil, err + } + + // Save the sequence, signer and txBytes of the in the local txTracker + // before the sequence is incremented + sequence := client.signer.Account(signer).Sequence() + client.TxTracker.trackTransaction(signer, sequence, resp.TxHash, txBytes) // Increment sequence after successful submission if err := client.signer.IncrementSequence(signer); err != nil { @@ -594,7 +616,7 @@ func (client *TxClient) submitToSingleConnection(ctx context.Context, txBytes [] } // sendTxToConnection broadcasts a transaction to the chain and returns the response. -func (client *TxClient) sendTxToConnection(ctx context.Context, conn *grpc.ClientConn, txBytes []byte) (*sdktypes.TxResponse, error) { +func (client *TxClient) SendTxToConnection(ctx context.Context, conn *grpc.ClientConn, txBytes []byte) (*sdktypes.TxResponse, error) { span := trace.SpanFromContext(ctx) resp, err := sdktx.NewServiceClient(conn).BroadcastTx( @@ -695,7 +717,7 @@ func (client *TxClient) submitToMultipleConnections(ctx context.Context, txBytes go func(conn *grpc.ClientConn) { defer wg.Done() - resp, err := client.sendTxToConnection(ctx, conn, txBytes) + resp, err := client.SendTxToConnection(ctx, conn, txBytes) if err != nil { errCh <- err return @@ -720,7 +742,8 @@ func (client *TxClient) submitToMultipleConnections(ctx context.Context, txBytes // Return first successful response, if any if resp, ok := <-respCh; ok && resp != nil { - client.trackTransaction(signer, resp.TxHash, txBytes) + sequence := client.signer.Account(signer).Sequence() + client.TxTracker.trackTransaction(signer, sequence, resp.TxHash, txBytes) if err := client.signer.IncrementSequence(signer); err != nil { return nil, fmt.Errorf("increment sequencing: %w", err) @@ -736,15 +759,6 @@ func (client *TxClient) submitToMultipleConnections(ctx context.Context, txBytes return nil, errors.Join(errs...) } -// pruneTxTracker removes transactions from the local tx tracker that are older than 10 minutes -func (client *TxClient) pruneTxTracker() { - for hash, txInfo := range client.txTracker { - if time.Since(txInfo.timestamp) >= txTrackerPruningInterval { - delete(client.txTracker, hash) - } - } -} - // ConfirmTx periodically pings the provided node for the commitment of a transaction by its // hash. It will continually loop until the context is cancelled, the tx is found or an error // is encountered. @@ -780,15 +794,15 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon )) if resp.ExecutionCode != abci.CodeTypeOK { span.RecordError(fmt.Errorf("txclient/ConfirmTx: execution error: %s", resp.Error)) - client.deleteFromTxTracker(txHash) + client.TxTracker.deleteFromTxTracker(txHash) return nil, client.buildExecutionError(txHash, resp) } span.AddEvent("txclient/ConfirmTx: transaction confirmed successfully") - client.deleteFromTxTracker(txHash) + client.TxTracker.deleteFromTxTracker(txHash) return client.buildTxResponse(txHash, resp), nil case core.TxStatusEvicted: - _, _, txBytes, exists := client.GetTxFromTxTracker(txHash) + _, _, txBytes, exists := client.TxTracker.GetTxFromTxTracker(txHash) if !exists { return nil, fmt.Errorf("tx: %s not found in txTracker; likely failed during broadcast", txHash) } @@ -804,7 +818,7 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon )) // If we're not already tracking eviction timeout, try to resubmit - _, err := client.sendTxToConnection(ctx, client.conns[0], txBytes) + _, err := client.SendTxToConnection(ctx, client.conns[0], txBytes) if err != nil { // Check if the error is a broadcast tx error _, ok := err.(*BroadcastTxError) @@ -819,7 +833,7 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon span.AddEvent("txclient/ConfirmTx: transaction resubmitted successfully after eviction") case core.TxStatusRejected: span.RecordError(fmt.Errorf("txclient/ConfirmTx: transaction rejected: %s", resp.Error)) - sequence, signer, _, exists := client.GetTxFromTxTracker(txHash) + sequence, signer, _, exists := client.TxTracker.GetTxFromTxTracker(txHash) if !exists { return nil, fmt.Errorf("tx: %s not found in tx client txTracker; likely failed during broadcast", txHash) } @@ -828,11 +842,11 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon if err := client.signer.SetSequence(signer, sequence); err != nil { return nil, fmt.Errorf("setting sequence: %w", err) } - client.deleteFromTxTracker(txHash) + client.TxTracker.deleteFromTxTracker(txHash) return nil, fmt.Errorf("tx with hash %s was rejected by the node with execution code: %d and log: %s", txHash, resp.ExecutionCode, resp.Error) default: span.RecordError(fmt.Errorf("txclient/ConfirmTx: unknown tx status for tx: %s", txHash)) - client.deleteFromTxTracker(txHash) + client.TxTracker.deleteFromTxTracker(txHash) if ctx.Err() != nil { return nil, ctx.Err() } @@ -861,13 +875,6 @@ func extractSequenceError(fullError string) string { return s } -// deleteFromTxTracker safely deletes a transaction from the local tx tracker. -func (client *TxClient) deleteFromTxTracker(txHash string) { - client.mtx.Lock() - defer client.mtx.Unlock() - delete(client.txTracker, txHash) -} - // EstimateGasPriceAndUsage returns the estimated gas price based on the provided priority, // and also the gas limit/used for the provided transaction. // The gas limit is calculated by simulating the transaction and then calculating the amount of gas that was consumed during execution. @@ -1045,26 +1052,6 @@ func (client *TxClient) getAccountNameFromMsgs(msgs []sdktypes.Msg) (string, err return record.Name, nil } -// trackTransaction tracks a transaction without acquiring the mutex. -// This should only be called when the caller already holds the mutex. -func (client *TxClient) trackTransaction(signer, txHash string, txBytes []byte) { - sequence := client.signer.Account(signer).Sequence() - client.txTracker[txHash] = txInfo{ - sequence: sequence, - signer: signer, - timestamp: time.Now(), - txBytes: txBytes, - } -} - -// GetTxFromTxTracker gets transaction info from the tx client's local tx tracker by its hash -func (client *TxClient) GetTxFromTxTracker(hash string) (sequence uint64, signer string, txBytes []byte, exists bool) { - client.mtx.Lock() - defer client.mtx.Unlock() - txInfo, exists := client.txTracker[hash] - return txInfo.sequence, txInfo.signer, txInfo.txBytes, exists -} - // Signer exposes the tx clients underlying signer func (client *TxClient) Signer() *Signer { return client.signer @@ -1073,51 +1060,51 @@ func (client *TxClient) Signer() *Signer { // StartTxQueueForTest starts the tx queue for testing purposes. // This function is only intended for use in tests. func (client *TxClient) StartTxQueueForTest(ctx context.Context) error { - if client.txQueue == nil { + if client.parallelTxQueue == nil { return nil } - return client.txQueue.start(ctx) + return client.parallelTxQueue.start(ctx) } // StopTxQueueForTest stops the tx queue for testing purposes. // This function is only intended for use in tests. func (client *TxClient) StopTxQueueForTest() { - if client.txQueue != nil { - client.txQueue.stop() + if client.parallelTxQueue != nil { + client.parallelTxQueue.stop() } } // IsTxQueueStartedForTest returns whether the tx queue is started, for testing purposes. // This function is only intended for use in tests. func (client *TxClient) IsTxQueueStartedForTest() bool { - if client.txQueue == nil { + if client.parallelTxQueue == nil { return false } - return client.txQueue.isStarted() + return client.parallelTxQueue.isStarted() } // TxQueueWorkerCount returns the number of workers in the tx queue func (client *TxClient) TxQueueWorkerCount() int { - if client.txQueue == nil { + if client.parallelTxQueue == nil { return 0 } - return len(client.txQueue.workers) + return len(client.parallelTxQueue.workers) } // TxQueueWorkerAddress returns the address for the worker at the given index func (client *TxClient) TxQueueWorkerAddress(index int) string { - if client.txQueue == nil || index < 0 || index >= len(client.txQueue.workers) { + if client.parallelTxQueue == nil || index < 0 || index >= len(client.parallelTxQueue.workers) { return "" } - return client.txQueue.workers[index].address + return client.parallelTxQueue.workers[index].address } // TxQueueWorkerAccountName returns the account name for the worker at the given index func (client *TxClient) TxQueueWorkerAccountName(index int) string { - if client.txQueue == nil || index < 0 || index >= len(client.txQueue.workers) { + if client.parallelTxQueue == nil || index < 0 || index >= len(client.parallelTxQueue.workers) { return "" } - return client.txQueue.workers[index].accountName + return client.parallelTxQueue.workers[index].accountName } // QueryMinimumGasPrice queries both the nodes local and network wide diff --git a/pkg/user/tx_client_test.go b/pkg/user/tx_client_test.go index 192cb221eb..c56d3a985d 100644 --- a/pkg/user/tx_client_test.go +++ b/pkg/user/tx_client_test.go @@ -342,7 +342,7 @@ func TestRejections(t *testing.T) { seqAfterConfirmation := sender.Sequence() require.Equal(t, seqBeforeSubmission+1, seqAfterConfirmation) // Was removed from the tx tracker - _, _, _, exists := txClient.GetTxFromTxTracker(resp.TxHash) + _, _, _, exists := txClient.TxTracker.GetTxFromTxTracker(resp.TxHash) require.False(t, exists) } @@ -393,7 +393,7 @@ func TestEvictions(t *testing.T) { require.NoError(t, err) require.Equal(t, res.Code, abci.CodeTypeOK) // They should be removed from the tx tracker after confirmation - _, _, _, exists := txClient.GetTxFromTxTracker(resp.TxHash) + _, _, _, exists := txClient.TxTracker.GetTxFromTxTracker(resp.TxHash) require.False(t, exists) } @@ -565,13 +565,13 @@ func (suite *TxClientTestSuite) queryCurrentBalance(t *testing.T) int64 { } func wasRemovedFromTxTracker(txHash string, txClient *user.TxClient) bool { - seq, signer, txBytes, exists := txClient.GetTxFromTxTracker(txHash) + seq, signer, txBytes, exists := txClient.TxTracker.GetTxFromTxTracker(txHash) return !exists && seq == 0 && signer == "" && len(txBytes) == 0 } // assertTxInTxTracker verifies that a tx was indexed in the tx tracker and that the sequence increases by one after broadcast. func assertTxInTxTracker(t *testing.T, txClient *user.TxClient, txHash, expectedSigner string, seqBeforeBroadcast uint64) { - seqFromTxTracker, signer, txBytes, exists := txClient.GetTxFromTxTracker(txHash) + seqFromTxTracker, signer, txBytes, exists := txClient.TxTracker.GetTxFromTxTracker(txHash) require.True(t, exists) require.Equal(t, expectedSigner, signer) seqAfterBroadcast := txClient.Signer().Account(expectedSigner).Sequence() @@ -827,7 +827,7 @@ func (suite *TxClientTestSuite) TestSequenceIncrementOnlyOnceInMultiConnBroadcas require.Equal(t, seqBefore+1, seqAfter, "Sequence should be incremented by exactly 1, not by number of connections") // Verify the transaction is tracked - trackedSeq, trackedSigner, _, exists := multiConnClient.GetTxFromTxTracker(resp.TxHash) + trackedSeq, trackedSigner, _, exists := multiConnClient.TxTracker.GetTxFromTxTracker(resp.TxHash) require.True(t, exists, "Transaction should be in tracker") require.Equal(t, seqBefore, trackedSeq, "Tracked sequence should be the sequence before increment") require.Equal(t, multiConnClient.DefaultAccountName(), trackedSigner, "Tracked signer should match") diff --git a/pkg/user/tx_tracker.go b/pkg/user/tx_tracker.go new file mode 100644 index 0000000000..af8e380657 --- /dev/null +++ b/pkg/user/tx_tracker.go @@ -0,0 +1,104 @@ +package user + +import ( + "sync" + "time" +) + +// TxTracker tracks transaction metadata for submitted transactions. +// It stores the hash, sequence, signer, and raw bytes for each transaction. +// This is used for: +// - Resubmitting evicted transactions with the same bytes +// - Rolling back sequence numbers on rejection +// - Pruning old transactions to prevent memory leaks +type TxTracker struct { + mu sync.RWMutex + TxQueue map[string]*txInfo +} + +// txInfo contains metadata about a submitted transaction +type txInfo struct { + signer string + sequence uint64 + txBytes []byte + timestamp time.Time +} + +// NewTxTracker creates a new TxTracker instance +func NewTxTracker() *TxTracker { + return &TxTracker{ + TxQueue: make(map[string]*txInfo), + } +} + +// trackTransaction adds a transaction to the tracker +func (t *TxTracker) trackTransaction(signer string, sequence uint64, txHash string, txBytes []byte) { + t.mu.Lock() + defer t.mu.Unlock() + + t.TxQueue[txHash] = &txInfo{ + signer: signer, + sequence: sequence, + txBytes: txBytes, + timestamp: time.Now(), + } +} + +// GetTxFromTxTracker retrieves transaction metadata by hash +// Returns: sequence, signer, txBytes, exists +func (t *TxTracker) GetTxFromTxTracker(txHash string) (uint64, string, []byte, bool) { + t.mu.RLock() + defer t.mu.RUnlock() + + info, exists := t.TxQueue[txHash] + if !exists { + return 0, "", nil, false + } + + return info.sequence, info.signer, info.txBytes, true +} + +// RemoveTxFromTxTracker removes a transaction from the tracker +func (t *TxTracker) RemoveTxFromTxTracker(txHash string) { + t.mu.Lock() + defer t.mu.Unlock() + + delete(t.TxQueue, txHash) +} + +// deleteFromTxTracker removes a transaction from the tracker (alias for RemoveTxFromTxTracker) +func (t *TxTracker) deleteFromTxTracker(txHash string) { + t.RemoveTxFromTxTracker(txHash) +} + +// GetTxBytes retrieves the transaction bytes for a given account and sequence +// Returns nil if not found +func (t *TxTracker) GetTxBytes(accountName string, sequence uint64) []byte { + t.mu.RLock() + defer t.mu.RUnlock() + + // Search through all transactions to find one matching the account and sequence + for _, info := range t.TxQueue { + if info.signer == accountName && info.sequence == sequence { + return info.txBytes + } + } + + return nil +} + +// pruneTxTracker removes transactions older than txTrackerPruningInterval +// This prevents memory leaks from accumulating transaction history +func (t *TxTracker) pruneTxTracker() { + t.mu.Lock() + defer t.mu.Unlock() + + cutoffTime := time.Now().Add(-txTrackerPruningInterval) + + for txHash, info := range t.TxQueue { + if info.timestamp.Before(cutoffTime) { + delete(t.TxQueue, txHash) + } + } +} + diff --git a/pkg/user/v2/sequential_queue.go b/pkg/user/v2/sequential_queue.go new file mode 100644 index 0000000000..5cd46b88bc --- /dev/null +++ b/pkg/user/v2/sequential_queue.go @@ -0,0 +1,699 @@ +package v2 + +import ( + "context" + "fmt" + "regexp" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/celestiaorg/celestia-app/v6/app/grpc/tx" + "github.com/celestiaorg/celestia-app/v6/pkg/user" + "github.com/celestiaorg/go-square/v3/share" + abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/rpc/core" + sdktypes "github.com/cosmos/cosmos-sdk/types" +) + +// SequentialSubmissionJob represents a transaction submission task +type SequentialSubmissionJob struct { + Blobs []*share.Blob + Options []user.TxOption + Ctx context.Context + ResultsC chan SequentialSubmissionResult +} + +// SequentialSubmissionResult contains the result of a transaction submission +type SequentialSubmissionResult struct { + TxResponse *sdktypes.TxResponse + Error error +} + +// sequentialQueue manages single-threaded transaction submission with a unified queue +type sequentialQueue struct { + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + + client *TxClient + accountName string + pollTime time.Duration + + // Single unified queue - transactions stay here until confirmed + mu sync.RWMutex + queue []*queuedTx // All transactions from submission to confirmation + queueMemoryBytes uint64 // Total memory used by blobs in queue (in bytes) + maxMemoryBytes uint64 // Maximum memory allowed for queue (in bytes) + ResignChan chan *queuedTx // Channel for all rejected transactions that need to be resigned + ResubmitChan chan *queuedTx // Channel for all evicted transactions that need to be resubmitted + + // Track last confirmed sequence for rollback logic + lastConfirmedSeq uint64 + + // Track last rejected sequence for rollback logic + lastRejectedSeq uint64 + + isRecovering atomic.Bool + + // Submission tracking metrics + newBroadcastCount uint64 // Count of new transaction broadcasts + resubmitCount uint64 // Count of resubmissions (evicted txs) + resignCount uint64 // Count of resignations (rejected txs) + lastMetricsLog time.Time // Last time we logged metrics + metricsStartTime time.Time // Start time for rate calculation +} + +// queuedTx represents a transaction in the queue (from submission to confirmation) +type queuedTx struct { + // Original submission data + blobs []*share.Blob + options []user.TxOption + resultsC chan SequentialSubmissionResult + + // Set after broadcast + txHash string // Empty until broadcast + txBytes []byte // Set after broadcast, used for eviction resubmission + sequence uint64 // Set after broadcast + submittedAt time.Time // Set after broadcast + isResubmitting bool // True if transaction is currently being resubmitted (prevents duplicates) +} + +const ( + // defaultMaxQueueMemoryMB is the maximum memory (in MB) allowed for the queue to prevent OOM + // This limits the total size of blob data held in memory at once + defaultMaxQueueMemoryMB = 200 // 100MB default +) + +func newSequentialQueue(client *TxClient, accountName string, pollTime time.Duration) *sequentialQueue { + if pollTime == 0 { + pollTime = user.DefaultPollTime + } + + ctx, cancel := context.WithCancel(context.Background()) + now := time.Now() + q := &sequentialQueue{ + client: client, + accountName: accountName, + pollTime: pollTime, + ctx: ctx, + cancel: cancel, + queue: make([]*queuedTx, 0), + queueMemoryBytes: 0, + maxMemoryBytes: defaultMaxQueueMemoryMB * 1024 * 1024, // Convert MB to bytes + ResubmitChan: make(chan *queuedTx, 10), + lastMetricsLog: now, + metricsStartTime: now, + } + return q +} + +// start begins the sequential queue processing +func (q *sequentialQueue) start() { + q.wg.Add(2) + go func() { + defer q.wg.Done() + q.coordinate() + }() + go func() { + defer q.wg.Done() + q.monitorLoop() + }() +} + +func (q *sequentialQueue) setRecovering(v bool) { + // q.mu.Lock() + q.isRecovering.Store(v) + // q.mu.Unlock() +} + +func (q *sequentialQueue) getRecovering() bool { + // q.mu.RLock() + return q.isRecovering.Load() + // q.mu.RUnlock() +} + +// submitJob adds a new transaction to the queue +// It enforces memory limits to prevent OOM by blocking until sufficient memory is available +func (q *sequentialQueue) submitJob(job *SequentialSubmissionJob) { + // Calculate memory size of this transaction's blobs + blobsMemory := calculateBlobsMemory(job.Blobs) + + // Wait for memory space in queue (backpressure) - prevents memory exhaustion + for { + q.mu.Lock() + if q.queueMemoryBytes+blobsMemory <= q.maxMemoryBytes { + // Sufficient memory available - add transaction + qTx := &queuedTx{ + blobs: job.Blobs, + options: job.Options, + resultsC: job.ResultsC, + } + q.queue = append(q.queue, qTx) + q.queueMemoryBytes += blobsMemory + + currentMemMB := float64(q.queueMemoryBytes) / (1024 * 1024) + maxMemMB := float64(q.maxMemoryBytes) / (1024 * 1024) + q.mu.Unlock() + + // Log when approaching capacity (>80%) + if q.queueMemoryBytes > (q.maxMemoryBytes * 80 / 100) { + fmt.Printf("[MEMORY] Queue approaching capacity: %.2f/%.2f MB (%.1f%%)\n", + currentMemMB, maxMemMB, (currentMemMB/maxMemMB)*100) + } + return + } + + // Queue memory full - unlock and wait for space + // currentMemMB := float64(q.queueMemoryBytes) / (1024 * 1024) + // maxMemMB := float64(q.maxMemoryBytes) / (1024 * 1024) + q.mu.Unlock() + + // fmt.Printf("[BACKPRESSURE] Queue memory full (%.2f/%.2f MB), waiting to prevent OOM\n", + // currentMemMB, maxMemMB) + + select { + case <-time.After(100 * time.Millisecond): + // Wait a bit then retry + case <-q.ctx.Done(): + // Context cancelled, exit + return + } + } +} + +// calculateBlobsMemory returns the total memory size of blobs in bytes +func calculateBlobsMemory(blobs []*share.Blob) uint64 { + var total uint64 + for _, blob := range blobs { + if blob != nil { + total += uint64(len(blob.Data())) + } + } + return total +} + +// GetQueueSize returns the number of transactions in the queue +func (q *sequentialQueue) GetQueueSize() int { + q.mu.RLock() + defer q.mu.RUnlock() + return len(q.queue) +} + +// processNextTx signs and broadcasts the next unbroadcast transaction in queue +func (q *sequentialQueue) processNextTx() { + startTime := time.Now() + + fmt.Println("Recovering?", q.getRecovering()) + if q.getRecovering() { + fmt.Println("Recovering from previous rejection/eviction - skipping current tx") + return + } + + scanStart := time.Now() + var qTx *queuedTx + q.mu.RLock() + for _, tx := range q.queue { + if tx.txHash == "" { + qTx = tx + break + } + } + queueSize := len(q.queue) + q.mu.RUnlock() + scanDuration := time.Since(scanStart) + + if qTx == nil { + return + } + + fmt.Printf("[TIMING] Queue scan took %v (queue size: %d)\n", scanDuration, queueSize) + + // Log current signer sequence before broadcast + currentSeq := q.client.Signer().Account(q.accountName).Sequence() + fmt.Printf("[DEBUG] Attempting broadcast with signer sequence: %d\n", currentSeq) + + broadcastStart := time.Now() + resp, txBytes, err := q.client.BroadcastPayForBlobWithoutRetry( + q.ctx, + q.accountName, + qTx.blobs, + qTx.options..., + ) + broadcastDuration := time.Since(broadcastStart) + fmt.Printf("[TIMING] Broadcast call took %v\n", broadcastDuration) + + if err != nil || resp.Code != 0 { + // Check if this is a sequence mismatch AND we're blocked + // This means the sequence was rolled back while we were broadcasting + // TODO: maybe we can check if q is blocked and if so, return + // otherwise it could mean client is stalled + if IsSequenceMismatchError(err) { + fmt.Println("Sequence mismatch error in broadcast: ", err) + // fmt.Println("Sequence mismatch error") + // // check expected sequence and check if there is transaction with that sequence + // expectedSeq := parseExpectedSequence(err.Error()) + // // check if there is transaction with that sequence + // for _, txx := range q.queue { + // fmt.Println("expectedSeq: ", expectedSeq) + // if txx.sequence == expectedSeq { + // fmt.Printf("Found transaction with expected sequence with hash %s\n", txx.txHash[:16]) + // // check status of tx + // txClient := tx.NewTxClient(q.client.GetGRPCConnection()) + // statusResp, err := txClient.TxStatus(q.ctx, &tx.TxStatusRequest{TxId: txx.txHash}) + // if err != nil { + // fmt.Printf("Failed to check status of tx %s: %v\n", txx.txHash[:16], err) + // continue + // } + // if statusResp.Status == core.TxStatusRejected { + // q.handleRejected(txx, statusResp, txClient) + // } + // fmt.Println("status for this expected hash: ", statusResp.Status) + // fmt.Println("status log: ", statusResp.Error) + // return + // } + + // } + // return because we are probably blocked, we will try again + return + } + + // Other broadcast errors - send error and remove from queue + select { + case qTx.resultsC <- SequentialSubmissionResult{ + Error: fmt.Errorf("broadcast failed: %w", err), + }: + case <-q.ctx.Done(): + } + q.removeFromQueue(qTx) + return + } + + q.mu.Lock() + defer q.mu.Unlock() + // Broadcast successful - mark as broadcast in queue + sequence := q.client.Signer().Account(q.accountName).Sequence() + + qTx.txHash = resp.TxHash + qTx.txBytes = txBytes + qTx.sequence = sequence - 1 // sequence is incremented after successful submission + qTx.submittedAt = time.Now() + + fmt.Printf("Broadcast successful for tx %s - marking as broadcast in queue\n", qTx.txHash[:16]) + fmt.Printf("[TIMING] Total processNextTx took %v\n", time.Since(startTime)) +} + +// monitorLoop periodically checks the status of broadcast transactions +func (q *sequentialQueue) monitorLoop() { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-q.ctx.Done(): + return + case <-ticker.C: + q.checkBroadcastTransactions() + } + } +} + +// coordinate coordinates transaction submission with confirmation +func (q *sequentialQueue) coordinate() { + ticker := time.NewTicker(time.Second) //TODO: it's currently fine without additional delays. Might still be necessary tho. + defer ticker.Stop() + + for { + select { + case <-q.ctx.Done(): + return + case <-q.ResignChan: + // TODO: decide if we want to do anything during rejections + case qTx := <-q.ResubmitChan: + q.ResubmitEvicted(qTx) + default: + q.processNextTx() + } + } +} + +// TODO: come back to this and see if it makes sense +// func (q *sequentialQueue) setTxInfo(qTx *queuedTx, resp *sdktypes.TxResponse, txBytes []byte, sequence uint64) { +// q.mu.Lock() +// defer q.mu.Unlock() + +// qTx.txHash = resp.TxHash +// qTx.txBytes = txBytes +// qTx.sequence = sequence +// qTx.shouldResign = false +// } + +func (q *sequentialQueue) ResubmitEvicted(qTx *queuedTx) { + startTime := time.Now() + fmt.Printf("Resubmitting evicted tx with hash %s and sequence %d\n", qTx.txHash[:16], qTx.sequence) + q.mu.RLock() + txBytes := qTx.txBytes + q.mu.RUnlock() + + // check if the tx needs to be resubmitted + resubmitStart := time.Now() + resubmitResp, err := q.client.SendTxToConnection(q.ctx, q.client.GetGRPCConnection(), txBytes) + resubmitDuration := time.Since(resubmitStart) + fmt.Printf("[TIMING] Resubmit network call took %v\n", resubmitDuration) + if err != nil || resubmitResp.Code != 0 { + select { + case qTx.resultsC <- SequentialSubmissionResult{ + Error: fmt.Errorf("evicted and failed to resubmit with hash %s: %w", qTx.txHash[:16], err), + }: + case <-q.ctx.Done(): + } + // send error and remove from queue + q.removeFromQueue(qTx) + return + } + + // Successful resubmission - reset flag and track metrics + q.mu.Lock() + qTx.isResubmitting = false + q.resubmitCount++ + q.mu.Unlock() + + // Exit recovery mode after successful resubmission to allow new txs to be broadcast + // q.setRecovering(false) + + fmt.Printf("Successfully resubmitted tx %s\n", qTx.txHash[:16]) + fmt.Printf("[TIMING] Total ResubmitEvicted took %v\n", time.Since(startTime)) +} + +// checkBroadcastTransactions checks status of all broadcast transactions +func (q *sequentialQueue) checkBroadcastTransactions() { + startTime := time.Now() + fmt.Println("Checking broadcast transactions") + + scanStart := time.Now() + q.mu.RLock() + // Collect broadcast transaction hashes (cap at 20 per batch for efficiency) + const maxBatchSize = 20 + var broadcastTxHashes []string + var broadcastTxs []*queuedTx + for _, tx := range q.queue { + if tx.txHash != "" { + broadcastTxHashes = append(broadcastTxHashes, tx.txHash) + broadcastTxs = append(broadcastTxs, tx) + // Cap at 20 transactions per status check batch + if len(broadcastTxHashes) >= maxBatchSize { + break + } + } + } + fmt.Printf("Broadcast txs: %d\n", len(broadcastTxHashes)) + totalQueueSize := len(q.queue) + q.mu.RUnlock() + scanDuration := time.Since(scanStart) + + fmt.Printf("Total queue size: %d, Broadcast txs: %d\n", totalQueueSize, len(broadcastTxHashes)) + fmt.Printf("[TIMING] Collecting broadcast txs scan took %v\n", scanDuration) + + if len(broadcastTxHashes) == 0 { + return + } + + // Create tx client for status queries + txClient := tx.NewTxClient(q.client.GetGRPCConnection()) + + statusCheckStart := time.Now() + + // Try batch status check first + statusResp, err := txClient.TxStatusBatch(q.ctx, &tx.TxStatusBatchRequest{TxIds: broadcastTxHashes}) + + // If batch is not supported, fall back to individual status checks + if err != nil { + return + } + + statusCheckCount := len(statusResp.Statuses) + for i, statusRespp := range statusResp.Statuses { + q.mu.RLock() + qTx := broadcastTxs[i] + q.mu.RUnlock() + switch statusRespp.Status.Status { + case core.TxStatusCommitted: + q.handleCommitted(broadcastTxs[i], statusRespp.Status) + case core.TxStatusEvicted: + q.setRecovering(true) + // Found an evicted tx - check if already being resubmitted to avoid duplicates + q.mu.Lock() + if qTx.isResubmitting { + q.mu.Unlock() + fmt.Printf("Tx %s is already being resubmitted - skipping\n", qTx.txHash[:16]) + continue + } + qTx.isResubmitting = true + sequence := qTx.sequence + q.mu.Unlock() + fmt.Printf("Detected evicted tx with sequence %d - resubmitting\n", sequence) + + q.ResubmitChan <- qTx + case core.TxStatusRejected: + q.setRecovering(true) + prevStatus := "" + if i > 0 { + prevStatus = statusResp.Statuses[i-1].Status.Status + } + q.handleRejected(qTx, statusRespp.Status, txClient, prevStatus) + + q.removeFromQueue(qTx) + // return errror + select { + case qTx.resultsC <- SequentialSubmissionResult{ + Error: fmt.Errorf("tx rejected with code %d: %s", statusRespp.Status.ExecutionCode, statusRespp.Status.Error), + }: + case <-q.ctx.Done(): + } + } + } + q.recomputeRecoveryState(statusResp.Statuses) + + statusCheckDuration := time.Since(statusCheckStart) + if statusCheckCount > 0 { + fmt.Printf("[TIMING] Status checks took %v for %d txs (avg: %v per tx)\n", + statusCheckDuration, statusCheckCount, statusCheckDuration/time.Duration(statusCheckCount)) + } + fmt.Printf("[TIMING] Total checkBroadcastTransactions took %v\n", time.Since(startTime)) +} + +func (q *sequentialQueue) recomputeRecoveryState(statuses []*tx.TxStatusResult) { + q.mu.Lock() + defer q.mu.Unlock() + + inRecovery := false + + for _, st := range statuses { + switch st.Status.Status { + case core.TxStatusRejected, core.TxStatusEvicted: + // still bad + inRecovery = true + } + } + + if inRecovery { + if !q.isRecovering.Load() { + fmt.Println("Entering recovery mode") + } + q.isRecovering.Store(true) + } else { + if q.isRecovering.Load() { + fmt.Println("Exiting recovery mode") + } + q.isRecovering.Store(false) + } +} + +// handleCommitted processes a confirmed transaction +func (q *sequentialQueue) handleCommitted(qTx *queuedTx, statusResp *tx.TxStatusResponse) { + fmt.Println("Handling confirmed tx") + // Check execution code + if statusResp.ExecutionCode != abci.CodeTypeOK { + // Execution failed + select { + case <-q.ctx.Done(): + case qTx.resultsC <- SequentialSubmissionResult{ + Error: fmt.Errorf("tx execution failed with code %d: %s", statusResp.ExecutionCode, statusResp.Error), + }: + } + q.removeFromQueue(qTx) + return + } + + // Success - send result + select { + case <-q.ctx.Done(): + return + case qTx.resultsC <- SequentialSubmissionResult{ + TxResponse: &sdktypes.TxResponse{ + Height: statusResp.Height, + TxHash: qTx.txHash, + Code: statusResp.ExecutionCode, + Codespace: statusResp.Codespace, + GasWanted: statusResp.GasWanted, + GasUsed: statusResp.GasUsed, + Signers: statusResp.Signers, + }, + Error: nil, + }: + } + + q.mu.RLock() + fmt.Printf("LAST CONFIRMED SEQUENCE and HASH: %d, %s\n", q.lastConfirmedSeq, qTx.txHash[:16]) + q.mu.RUnlock() + + // Update last confirmed sequence + q.setLastConfirmedSeq(qTx.sequence) + q.removeFromQueue(qTx) +} + +func (q *sequentialQueue) setLastConfirmedSeq(seq uint64) { + q.mu.Lock() + defer q.mu.Unlock() + q.lastConfirmedSeq = seq +} + +// handleRejected processes a rejected transaction +func (q *sequentialQueue) handleRejected(qTx *queuedTx, statusResp *tx.TxStatusResponse, txClient tx.TxClient, prevStatus string) { + q.mu.Lock() + defer q.mu.Unlock() + + rejectedSeq := qTx.sequence + fmt.Printf("Detected rejected tx with sequence %d\n", rejectedSeq) + + // Get current sequence to check if we've already rolled back + currentSeq := q.client.Signer().Account(q.accountName).Sequence() + + // If current sequence is already <= rejected sequence, we've already rolled back + // (either from a previous rejection or sequence is already correct) + // Since rejections are processed in order, if currentSeq > rejectedSeq, we can roll back directly + // if currentSeq <= rejectedSeq { + // fmt.Printf("Current sequence (%d) is already <= rejected sequence (%d) - skipping rollback\n", currentSeq, rejectedSeq) + // return + // } + if prevStatus == core.TxStatusRejected || rejectedSeq-1 == q.lastRejectedSeq { + fmt.Printf("Previous tx was rejected or current sequence is already the last rejected sequence - skipping rollback\n") + return + } + + if rejectedSeq > currentSeq { + fmt.Printf("Rejected sequence (%d) is greater than current sequence (%d) - skipping rollback\n", rejectedSeq, currentSeq) + return + } + + // Roll back to the rejected sequence + // Since rejections are processed in order, if we get here, no previous rejection caused a rollback + fmt.Printf("Rolling back nonce from sequence %d to sequence %d\n", currentSeq, rejectedSeq) + q.client.Signer().SetSequence(q.accountName, rejectedSeq) + fmt.Printf("Rolled back signer sequence to %d\n", rejectedSeq) + q.lastRejectedSeq = rejectedSeq +} + +// removeFromQueue removes a transaction from the queue and frees its memory +func (q *sequentialQueue) removeFromQueue(qTx *queuedTx) { + q.mu.Lock() + defer q.mu.Unlock() + + for i, tx := range q.queue { + if tx == qTx { + // Decrement memory counter + blobsMemory := calculateBlobsMemory(qTx.blobs) + if q.queueMemoryBytes >= blobsMemory { + q.queueMemoryBytes -= blobsMemory + } else { + // Safety check - should never happen + fmt.Printf("[WARNING] Memory accounting error: queueMemory=%d < blobsMemory=%d\n", + q.queueMemoryBytes, blobsMemory) + q.queueMemoryBytes = 0 + } + + // Remove from queue + q.queue = append(q.queue[:i], q.queue[i+1:]...) + return + } + } +} + +// // isPreviousTxConfirmed checks if the previous transaction was confirmed +// func (q *sequentialQueue) isPreviousTxConfirmed(seq uint64) bool { +// q.mu.RLock() +// defer q.mu.RUnlock() +// if seq == 0 { +// return true +// } +// return q.lastConfirmedSeq >= seq-1 +// } + +// // isPreviousTxCommittedOrPending checks if the previous transaction is COMMITTED, PENDING, or EVICTED (not REJECTED) +// // Returns true if previous tx is committed/pending/evicted, false if rejected or can't determine +// func (q *sequentialQueue) isPreviousTxCommittedOrPending(seq uint64, txClient tx.TxClient) bool { +// // If sequence is 0, there's no previous transaction - this case is handled in handleRejected +// // But if we're called with seq 0, return true to allow rollback +// if seq == 0 { +// return true +// } +// prevSeq := seq - 1 + +// // First check if it's confirmed via lastConfirmedSeq +// q.mu.RLock() +// if q.lastConfirmedSeq >= prevSeq { +// q.mu.RUnlock() +// fmt.Printf("Previous tx (seq %d) is confirmed via lastConfirmedSeq (%d)\n", prevSeq, q.lastConfirmedSeq) +// return true +// } + +// // Find the previous transaction in the queue +// var prevTx *queuedTx +// for _, txx := range q.queue { +// if txx.sequence == prevSeq && txx.txHash != "" { +// prevTx = txx +// break +// } +// } +// q.mu.RUnlock() + +// if prevTx == nil { +// // Previous transaction not in queue - assume it's confirmed +// fmt.Printf("Previous tx (seq %d) not in queue - assuming confirmed\n", prevSeq) +// return true +// } + +// // Check the actual status of the previous transaction +// statusResp, err := txClient.TxStatus(q.ctx, &tx.TxStatusRequest{TxId: prevTx.txHash}) +// if err != nil { +// // If we can't check status, assume it's confirmed (safe default) +// fmt.Printf("Failed to check status of previous tx (seq %d): %v - assuming confirmed\n", prevSeq, err) +// return true +// } +// fmt.Printf("Previous tx (seq %d) status: %s\n", prevSeq, statusResp.Status) + +// // Return true if COMMITTED, PENDING, or EVICTED (not REJECTED) +// // We roll back if previous was not rejected +// return statusResp.Status != core.TxStatusRejected +// } + +// // isSequenceMismatchRejection checks if an error message indicates sequence mismatch +// func isSequenceMismatchRejection(errMsg string) bool { +// return strings.Contains(errMsg, "account sequence mismatch") || +// strings.Contains(errMsg, "incorrect account sequence") +// } + +// parseExpectedSequence extracts the expected sequence number from error message +// e.g., "account sequence mismatch, expected 9727, got 9811" -> returns 9727 +func parseExpectedSequence(errMsg string) uint64 { + // Look for "expected " + re := regexp.MustCompile(`expected (\d+)`) + matches := re.FindStringSubmatch(errMsg) + if len(matches) >= 2 { + if seq, err := strconv.ParseUint(matches[1], 10, 64); err == nil { + return seq + } + } + return 0 +} diff --git a/pkg/user/v2/tx_client.go b/pkg/user/v2/tx_client.go index 4cca1021bc..a99a15d219 100644 --- a/pkg/user/v2/tx_client.go +++ b/pkg/user/v2/tx_client.go @@ -2,14 +2,19 @@ package v2 import ( "context" + "fmt" + "sync" + "time" "github.com/celestiaorg/celestia-app/v6/app/encoding" + apperrors "github.com/celestiaorg/celestia-app/v6/app/errors" "github.com/celestiaorg/celestia-app/v6/pkg/user" "github.com/celestiaorg/go-square/v3/share" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdktypes "github.com/cosmos/cosmos-sdk/types" + sdktx "github.com/cosmos/cosmos-sdk/types/tx" "google.golang.org/grpc" ) @@ -18,6 +23,13 @@ import ( type TxClient struct { // Embed the underlying client to automatically delegate all methods *user.TxClient + + // Sequential queues per account + sequentialQueues map[string]*sequentialQueue + queueMu sync.RWMutex + + // Primary GRPC connection (stored separately for access) + conn *grpc.ClientConn } // NewTxClient creates a new v2 TxClient by wrapping the original NewTxClient function. @@ -32,8 +44,15 @@ func NewTxClient( if err != nil { return nil, err } - - return &TxClient{TxClient: v1Client}, nil + v2Client := &TxClient{ + TxClient: v1Client, + sequentialQueues: make(map[string]*sequentialQueue), + conn: conn, + } + if err := v2Client.StartSequentialQueue(context.Background(), v1Client.DefaultAccountName()); err != nil { + return nil, err + } + return v2Client, nil } // SetupTxClient creates and initializes a new v2 TxClient by wrapping the original setupTxClient method. @@ -48,13 +67,25 @@ func SetupTxClient( if err != nil { return nil, err } - - return &TxClient{TxClient: v1Client}, nil + v2Client := &TxClient{ + TxClient: v1Client, + sequentialQueues: make(map[string]*sequentialQueue), + conn: conn, + } + if err := v2Client.StartSequentialQueue(ctx, v1Client.DefaultAccountName()); err != nil { + return nil, err + } + return v2Client, nil } // Wrapv1TxClient wraps a v1 TxClient and returns a v2 TxClient. +// Note: connection will be nil, so sequential queue features requiring direct connection access won't work func Wrapv1TxClient(v1Client *user.TxClient) *TxClient { - return &TxClient{TxClient: v1Client} + return &TxClient{ + TxClient: v1Client, + sequentialQueues: make(map[string]*sequentialQueue), + conn: nil, + } } func (c *TxClient) buildSDKTxResponse(legacyResp *user.TxResponse) *sdktypes.TxResponse { @@ -120,3 +151,156 @@ func (c *TxClient) ConfirmTx(ctx context.Context, txHash string) (*sdktypes.TxRe return c.buildSDKTxResponse(legacyResp), nil } + +// StartSequentialQueue starts a sequential submission queue for the given account. +func (c *TxClient) StartSequentialQueue(ctx context.Context, accountName string) error { + return c.StartSequentialQueueWithPollTime(ctx, accountName, user.DefaultPollTime) +} + +// StartSequentialQueueWithPollTime starts a sequential queue with a custom poll time. +func (c *TxClient) StartSequentialQueueWithPollTime(ctx context.Context, accountName string, pollTime time.Duration) error { + c.queueMu.Lock() + defer c.queueMu.Unlock() + + if _, exists := c.sequentialQueues[accountName]; exists { + return fmt.Errorf("sequential queue already running for account %s", accountName) + } + + queue := newSequentialQueue(c, accountName, pollTime) + queue.start() + + c.sequentialQueues[accountName] = queue + return nil +} + +// StopSequentialQueue stops the sequential queue for the given account. +func (c *TxClient) StopSequentialQueue(accountName string) { + c.queueMu.Lock() + defer c.queueMu.Unlock() + + if queue, exists := c.sequentialQueues[accountName]; exists { + queue.cancel() + delete(c.sequentialQueues, accountName) + } +} + +// StopAllSequentialQueues stops all running sequential queues. +func (c *TxClient) StopAllSequentialQueues() { + c.queueMu.Lock() + defer c.queueMu.Unlock() + + for accountName, queue := range c.sequentialQueues { + queue.cancel() + delete(c.sequentialQueues, accountName) + } +} + +// SubmitPFBToSequentialQueue submits blobs using the sequential queue for the default account. +func (c *TxClient) SubmitPFBToSequentialQueue(ctx context.Context, blobs []*share.Blob, opts ...user.TxOption) (*sdktypes.TxResponse, error) { + return c.SubmitPFBToSequentialQueueWithAccount(ctx, c.DefaultAccountName(), blobs, opts...) +} + +// SubmitPFBToSequentialQueueWithAccount submits blobs using the sequential queue for the specified account. +func (c *TxClient) SubmitPFBToSequentialQueueWithAccount(ctx context.Context, accountName string, blobs []*share.Blob, opts ...user.TxOption) (*sdktypes.TxResponse, error) { + c.queueMu.RLock() + queue, exists := c.sequentialQueues[accountName] + c.queueMu.RUnlock() + + if !exists { + return nil, fmt.Errorf("sequential queue not started for account %s", accountName) + } + + resultsC := make(chan SequentialSubmissionResult, 1) + // Note: We don't close this channel because the queue worker goroutines + // may still send on it after ctx.Done(). The channel will be GC'd naturally. + + job := &SequentialSubmissionJob{ + Blobs: blobs, + Options: opts, + Ctx: ctx, + ResultsC: resultsC, + } + queue.submitJob(job) + + // Wait for result + select { + case result := <-resultsC: + if result.Error != nil { + return nil, result.Error + } + return result.TxResponse, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// GetSequentialQueueSize returns the number of pending transactions in the queue for the given account. +func (c *TxClient) GetSequentialQueueSize(accountName string) (int, error) { + c.queueMu.RLock() + queue, exists := c.sequentialQueues[accountName] + c.queueMu.RUnlock() + + if !exists { + return 0, fmt.Errorf("sequential queue not started for account %s", accountName) + } + + return queue.GetQueueSize(), nil +} + +// BroadcastPayForBlobWithoutRetry broadcasts a PayForBlob transaction without automatic retry logic. +// Returns the transaction response and the raw transaction bytes. +func (c *TxClient) BroadcastPayForBlobWithoutRetry(ctx context.Context, accountName string, blobs []*share.Blob, opts ...user.TxOption) (*sdktypes.TxResponse, []byte, error) { + // Use BroadcastPayForBlobWithAccount but without confirmation + resp, txBytes, err := c.TxClient.BroadcastPayForBlobWithAccount(ctx, accountName, blobs, opts...) + if err != nil { + return nil, nil, err + } + + return resp, txBytes, nil +} + +// ResubmitTxBytes resubmits a transaction using pre-signed bytes without retry logic +func (c *TxClient) ResubmitTxBytes(ctx context.Context, txBytes []byte) (*sdktypes.TxResponse, error) { + // Get the connection for broadcasting + conn := c.GetGRPCConnection() + if conn == nil { + return nil, fmt.Errorf("no connection available") + } + + // Use the SDK tx service client to broadcast + sdktxClient := sdktx.NewServiceClient(conn) + resp, err := sdktxClient.BroadcastTx(ctx, &sdktx.BroadcastTxRequest{ + Mode: sdktx.BroadcastMode_BROADCAST_MODE_SYNC, + TxBytes: txBytes, + }) + if err != nil { + return nil, err + } + + // Check if broadcast was successful + if resp.TxResponse.Code != 0 { + return nil, fmt.Errorf("broadcast failed with code %d: %s", resp.TxResponse.Code, resp.TxResponse.RawLog) + } + + return resp.TxResponse, nil +} + +// GetGRPCConnection returns the primary GRPC connection for creating tx status clients +func (c *TxClient) GetGRPCConnection() *grpc.ClientConn { + return c.conn +} + +// IsSequenceMismatchError checks if an error is a sequence mismatch (nonce mismatch) +func IsSequenceMismatchError(err error) bool { + if err == nil { + return false + } + + // Check if it's a BroadcastTxError with sequence mismatch code + broadcastTxErr, ok := err.(*user.BroadcastTxError) + if !ok { + return false + } + + return apperrors.IsNonceMismatchCode(broadcastTxErr.Code) +} diff --git a/proto/celestia/core/v1/tx/tx.proto b/proto/celestia/core/v1/tx/tx.proto index 72c22e6d71..f0b08d0073 100644 --- a/proto/celestia/core/v1/tx/tx.proto +++ b/proto/celestia/core/v1/tx/tx.proto @@ -18,6 +18,14 @@ service Tx { get: "/celestia/core/v1/tx/{tx_id}" }; } + + // TxStatusBatch for batch queries + rpc TxStatusBatch(TxStatusBatchRequest) returns (TxStatusBatchResponse) { + option (google.api.http) = { + post: "/celestia/core/v1/tx/batch" + body: "*" + }; + } } // TxStatusRequest is the request type for the TxStatus gRPC method. @@ -47,3 +55,20 @@ message TxStatusResponse { // signers of the transaction. repeated string signers = 9; } + +// TxStatusBatchRequest is the request type for the batch TxStatus gRPC method. +message TxStatusBatchRequest { + // array of hex encoded tx hashes (each hash should be 64 characters long representing 32 bytes) + repeated string tx_ids = 1; +} + +// TxStatusResult represents a single transaction status result in a batch response. +message TxStatusResult { + string tx_hash = 1; + TxStatusResponse status = 2; +} + +// TxStatusBatchResponse is a response type for batched TxStatus query. It contains an array of transaction status results. +message TxStatusBatchResponse { + repeated TxStatusResult statuses = 1; +} diff --git a/scripts/mocha.sh b/scripts/mocha.sh index 2e268f277d..869b74ebe8 100755 --- a/scripts/mocha.sh +++ b/scripts/mocha.sh @@ -8,8 +8,8 @@ set -o nounset # Stop script execution if an undefined variable is used CHAIN_ID="mocha-4" NODE_NAME="node-name" -SEEDS="b402fe40f3474e9e208840702e1b7aa37f2edc4b@celestia-testnet-seed.itrocket.net:14656" -PEERS="daf2cecee2bd7f1b3bf94839f993f807c6b15fbf@celestia-testnet-peer.itrocket.net:11656,96b2761729cea90ee7c61206433fc0ba40c245bf@57.128.141.126:11656,f4f75a55bfc5f302ef34435ef096a4551ecb6804@152.53.33.96:12056,31bb1c9c1be7743d1115a8270bd1c83d01a9120a@148.72.141.31:26676,3e30bcfc55e7d351f18144aab4b0973e9e9bf987@65.108.226.183:11656,7a0d5818c0e5b0d4fbd86a9921f413f5e4e4ac1e@65.109.83.40:28656,43e9da043318a4ea0141259c17fcb06ecff816af@164.132.247.253:43656,5a7566aa030f7e5e7114dc9764f944b2b1324bcd@65.109.23.114:11656,c17c0cbf05e98656fee5f60fad469fc528f6d6de@65.109.25.113:11656,fb5e0b9efacc11916c58bbcd3606cbaa7d43c99f@65.108.234.84:28656,45504fb31eb97ea8778c920701fc8076e568a9cd@188.214.133.100:26656,edafdf47c443344fb940a32ab9d2067c482e59df@84.32.71.47:26656,ae7d00d6d70d9b9118c31ac0913e0808f2613a75@177.54.156.69:26656,7c841f59c35d70d9f1472d7d2a76a11eefb7f51f@136.243.69.100:43656" +SEEDS="b402fe40f3474e9e208840702e1b7aa37f2edc4b@celestia-testnet-seed.itrocket.net:14656,ee9f90974f85c59d3861fc7f7edb10894f6ac3c8@seed-mocha.pops.one:26656" +PEERS="fb5e0b9efacc11916c58bbcd3606cbaa7d43c99f@65.108.234.84:28656,7a649733c5ae1b8bba9a5d855d697811646a0f6a@184.107.149.93:36656,7acb49ef77a268b8ae134ad9db3632c933e5013a@212.83.43.40:26656,f9e950870eccdb40e2386896d7b6a7687a103c99@72.251.3.24:43656,2666d40498d8d435d7b29af7d157b64bcc37a39a@162.249.168.87:26656,ad95eb726622347266baf0913b065103a8f9d6ff@159.195.26.108:23656" RPC="https://celestia-testnet-rpc.itrocket.net:443" CELESTIA_APP_HOME="${HOME}/.celestia-app" @@ -42,6 +42,9 @@ sed -i.bak -e "s/^seeds *=.*/seeds = \"$SEEDS\"/" $CELESTIA_APP_HOME/config/conf echo "Setting persistent peers in config.toml..." sed -i -e "/^\[p2p\]/,/^\[/{s/^[[:space:]]*persistent_peers *=.*/persistent_peers = \"$PEERS\"/;}" $CELESTIA_APP_HOME/config/config.toml +echo "Setting ttl-num-blocks to 1\ in config.toml..." +sed -i.bak -e "s/^ttl-num-blocks *=.*/ttl-num-blocks = 1/" $CELESTIA_APP_HOME/config/config.toml + LATEST_HEIGHT=$(curl -s $RPC/block | jq -r .result.block.header.height); BLOCK_HEIGHT=$((LATEST_HEIGHT - 2000)); \ TRUST_HASH=$(curl -s "$RPC/block?height=$BLOCK_HEIGHT" | jq -r .result.block_id.hash) @@ -49,14 +52,15 @@ TRUST_HASH=$(curl -s "$RPC/block?height=$BLOCK_HEIGHT" | jq -r .result.block_id. echo "Latest height: $LATEST_HEIGHT" echo "Block height: $BLOCK_HEIGHT" echo "Trust hash: $TRUST_HASH" -echo "Enabling state sync in config.toml..." +echo "Enabling state sync and gRPC in config.toml..." sed -i.bak -E "s|^(enable[[:space:]]+=[[:space:]]+).*$|\1true| ; \ s|^(rpc_servers[[:space:]]+=[[:space:]]+).*$|\1\"$RPC,$RPC\"| ; \ s|^(trust_height[[:space:]]+=[[:space:]]+).*$|\1$BLOCK_HEIGHT| ; \ -s|^(trust_hash[[:space:]]+=[[:space:]]+).*$|\1\"$TRUST_HASH\"|" $HOME/.celestia-app/config/config.toml +s|^(trust_hash[[:space:]]+=[[:space:]]+).*$|\1\"$TRUST_HASH\"| ; \ +s|^(grpc_laddr[[:space:]]+=[[:space:]]+).*$|\1\"tcp://0.0.0.0:9098\"|" $HOME/.celestia-app/config/config.toml echo "Downloading genesis file..." celestia-appd download-genesis ${CHAIN_ID} > /dev/null 2>&1 # Hide output to reduce terminal noise echo "Starting celestia-appd..." -celestia-appd start --force-no-bbr +celestia-appd start --force-no-bbr --p2p.persistent_peers=${PEERS} --grpc.enable --grpc.address="0.0.0.0:9090" diff --git a/scripts/single-node-all-upgrades.sh b/scripts/single-node-all-upgrades.sh index 769c5d4aaf..e7b4bd2d24 100755 --- a/scripts/single-node-all-upgrades.sh +++ b/scripts/single-node-all-upgrades.sh @@ -101,7 +101,7 @@ startCelestiaApp() { --api.enable \ --grpc.enable \ --grpc-web.enable \ - --delayed-precommit-timeout 1s + --delayedt-precommit-timeout 1s } # Function to perform upgrade to a specific version diff --git a/tools/latency-monitor/main.go b/tools/latency-monitor/main.go index d7296b0d0e..302033b5c3 100644 --- a/tools/latency-monitor/main.go +++ b/tools/latency-monitor/main.go @@ -16,6 +16,7 @@ import ( "github.com/celestiaorg/celestia-app/v6/app" "github.com/celestiaorg/celestia-app/v6/app/encoding" "github.com/celestiaorg/celestia-app/v6/pkg/user" + v2 "github.com/celestiaorg/celestia-app/v6/pkg/user/v2" "github.com/celestiaorg/go-square/v3/share" "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/spf13/cobra" @@ -157,12 +158,19 @@ func monitorLatency( if accountName != "" { opts = append(opts, user.WithDefaultAccount(accountName)) } - txClient, err := user.SetupTxClient(ctx, kr, grpcConn, encCfg, opts...) + txClient, err := v2.SetupTxClient(ctx, kr, grpcConn, encCfg, opts...) if err != nil { return fmt.Errorf("failed to create tx client: %w", err) } fmt.Printf("Using account: %s\n", txClient.DefaultAccountName()) + fmt.Println("Sequential queue started for transaction submission") + + // Ensure sequential queue is stopped on exit + defer func() { + fmt.Println("Stopping sequential queue...") + txClient.StopAllSequentialQueues() + }() fmt.Println("Submitting transactions...") @@ -207,63 +215,58 @@ func monitorLatency( submitTime := time.Now() - // Broadcast transaction without waiting for confirmation - resp, err := txClient.BroadcastPayForBlob(ctx, []*share.Blob{blob}) - if err != nil { - fmt.Printf("Failed to broadcast tx: %v\n", err) - continue - } - - fmt.Printf("[SUBMIT] tx=%s size=%d bytes time=%s\n", - resp.TxHash[:16], randomSize, submitTime.Format("15:04:05.000")) - - if disableMetrics { - continue - } - - // Launch background goroutine to confirm the transaction - go func(txHash string, submitTime time.Time) { - confirmed, err := txClient.ConfirmTx(ctx, txHash) + // Submit to sequential queue (handles both broadcast and confirmation) + go func(submitTime time.Time, blobData []*share.Blob, size int) { + resp, err := txClient.SubmitPFBToSequentialQueue(ctx, blobData) if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - fmt.Printf("[CANCELLED] tx=%s context closed before confirmation\n", txHash[:16]) + fmt.Printf("[CANCELLED] context closed before submission\n") return } - resultsMux.Lock() - // Track failed confirmation - fmt.Printf("[FAILED] tx=%s error=%v\n", txHash[:16], err) - results = append(results, txResult{ - submitTime: submitTime, - commitTime: time.Now(), - latency: 0, - txHash: txHash, - code: 0, - height: 0, - failed: true, - errorMsg: err.Error(), - }) - resultsMux.Unlock() + if !disableMetrics { + resultsMux.Lock() + // Track failed submission/confirmation + fmt.Printf("[FAILED] error=%v\n", err) + results = append(results, txResult{ + submitTime: submitTime, + commitTime: time.Now(), + latency: 0, + txHash: "", + code: 0, + height: 0, + failed: true, + errorMsg: err.Error(), + }) + resultsMux.Unlock() + } + return + } + + fmt.Printf("[SUBMIT] tx=%s size=%d bytes time=%s\n", + resp.TxHash[:16], size, submitTime.Format("15:04:05.000")) + + if disableMetrics { return } - resultsMux.Lock() // Track successful confirmation commitTime := time.Now() latency := commitTime.Sub(submitTime) + resultsMux.Lock() fmt.Printf("[CONFIRM] tx=%s height=%d latency=%dms code=%d time=%s\n", - confirmed.TxHash[:16], confirmed.Height, latency.Milliseconds(), confirmed.Code, commitTime.Format("15:04:05.000")) + resp.TxHash[:16], resp.Height, latency.Milliseconds(), resp.Code, commitTime.Format("15:04:05.000")) results = append(results, txResult{ submitTime: submitTime, commitTime: commitTime, latency: latency, - txHash: confirmed.TxHash, - code: confirmed.Code, - height: confirmed.Height, + txHash: resp.TxHash, + code: resp.Code, + height: resp.Height, failed: false, errorMsg: "", }) resultsMux.Unlock() - }(resp.TxHash, submitTime) + }(submitTime, []*share.Blob{blob}, randomSize) } } } diff --git a/tools/spam_txclient/main.go b/tools/spam_txclient/main.go index 85ea6c9029..6c8e43fd41 100644 --- a/tools/spam_txclient/main.go +++ b/tools/spam_txclient/main.go @@ -14,9 +14,11 @@ import ( "github.com/celestiaorg/celestia-app/v6/app" "github.com/celestiaorg/celestia-app/v6/app/encoding" "github.com/celestiaorg/celestia-app/v6/pkg/user" + v2 "github.com/celestiaorg/celestia-app/v6/pkg/user/v2" "github.com/celestiaorg/go-square/v3/share" "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/crypto/keyring" + sdktypes "github.com/cosmos/cosmos-sdk/types" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -31,10 +33,10 @@ type Config struct { func main() { cfg := Config{ - MochaEndpoint: "rpc-mocha.pops.one:9090", + MochaEndpoint: "localhost:9090", BlobSizeKB: 300, // 300 KiB blobs IntervalMs: 1000, // Submit every 1 second - TestDurationSec: 240, // Run for 240 seconds + TestDurationSec: 0, // Run forever (0 = no timeout) } err := RunLoadTest(cfg) @@ -45,16 +47,25 @@ func main() { // RunLoadTest sets up the tx client, runs submissions, and reports results. func RunLoadTest(cfg Config) error { - log.Println("Setting up tx client and connecting to a mocha node") - - // Global test context with configured timeout - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.TestDurationSec)*time.Second) + log.Println("Setting up tx client v2 with sequential queue and connecting to a mocha node") + + // Global test context with optional timeout + var ctx context.Context + var cancel context.CancelFunc + if cfg.TestDurationSec > 0 { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(cfg.TestDurationSec)*time.Second) + log.Printf("Running for %d seconds", cfg.TestDurationSec) + } else { + ctx, cancel = context.WithCancel(context.Background()) + log.Println("Running forever (Ctrl+C to stop)") + } defer cancel() - txClient, grpcConn, _, err := NewMochaTxClient(ctx, cfg) + txClient, grpcConn, cleanupFunc, err := NewMochaTxClientV2(ctx, cfg) if err != nil { - return fmt.Errorf("failed to set up tx client: %w", err) + return fmt.Errorf("failed to set up tx client v2: %w", err) } + defer cleanupFunc() var ( txCounter int64 @@ -78,83 +89,92 @@ func RunLoadTest(cfg Config) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - if time.Since(time.Unix(0, lastSuccess.Load())) > 10*time.Second { - return fmt.Errorf("TxClient appears halted: no successful submission recently") + if time.Since(time.Unix(0, lastSuccess.Load())) > 10*time.Minute { + return fmt.Errorf("TxClient appears halted: no successful submission in 10 minutes") } id := atomic.AddInt64(&txCounter, 1) - // Separate goroutine for broadcasting and confirming txs + // Separate goroutine for submitting txs via sequential queue g.Go(func() error { - hash, err := BroadcastPayForBlob(ctx, txClient, grpcConn, cfg.BlobSizeKB, id) - if err != nil || hash == "" { - fmt.Printf("\nTX-%d: Broadcast failed: %v\n", id, err) + // Use background context so transaction never times out + txCtx := context.Background() + + resp, err := SubmitPayForBlobSequential(txCtx, txClient, grpcConn, cfg.BlobSizeKB, id) + if err != nil { + fmt.Printf("\nTX-%d: Sequential submission failed: %v\n", id, err) failedBroadcasts.Add(1) return nil } lastSuccess.Store(time.Now().UnixNano()) - fmt.Printf("\nTX-%d: Broadcast success (hash %s)\n", id, hash) + fmt.Printf("\nTX-%d: Sequential submission success (hash %s)\n", id, resp.TxHash) successfulBroadcasts.Add(1) - resp, err := txClient.ConfirmTx(ctx, hash) - if err != nil { - fmt.Printf("\nTX-%d: Confirm failed: %v\n", id, err) + // Sequential queue handles confirmation internally + if resp.Code == 0 { + fmt.Printf("\nTX-%d: Confirmed successfully: %s\n", id, resp.TxHash) + successfulConfirms.Add(1) + } else { + fmt.Printf("\nTX-%d: Execution failed with code %d\n", id, resp.Code) failedConfirms.Add(1) - return nil } - - fmt.Printf("\nTX-%d: Confirm success for %s: %d\n", id, hash, resp.Code) - successfulConfirms.Add(1) return nil }) } } }) - // This should only fail if the client halts + // This should only fail if the client halts unexpectedly err = g.Wait() - if err != nil && !errors.Is(err, context.DeadlineExceeded) { - return err + if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + return fmt.Errorf("tx client v2 halted unexpectedly: %w", err) } - fmt.Println("\nScript completed successfully!!") - fmt.Printf("Successful broadcasts: %d\n", successfulBroadcasts.Load()) - fmt.Printf("Successful confirms: %d\n", successfulConfirms.Load()) - fmt.Printf("Failed confirms: %d\n", failedConfirms.Load()) - fmt.Printf("Failed broadcasts: %d\n", failedBroadcasts.Load()) + fmt.Println("\n=== TxClient V2 Sequential Queue Test Results ===") + fmt.Printf("Successful sequential submissions: %d\n", successfulBroadcasts.Load()) + fmt.Printf("Successful confirmations: %d\n", successfulConfirms.Load()) + fmt.Printf("Failed confirmations: %d\n", failedConfirms.Load()) + fmt.Printf("Failed submissions: %d\n", failedBroadcasts.Load()) + + queueSize, err := txClient.GetSequentialQueueSize(txClient.DefaultAccountName()) + if err == nil { + fmt.Printf("Final queue size: %d\n", queueSize) + } cancel() // Clean up context before exit return nil } -func BroadcastPayForBlob(ctx context.Context, txClient *user.TxClient, grpcConn *grpc.ClientConn, blobSizeKB int, txID int64) (string, error) { +func SubmitPayForBlobSequential(ctx context.Context, txClient *v2.TxClient, grpcConn *grpc.ClientConn, blobSizeKB int, txID int64) (*sdktypes.TxResponse, error) { // Create random blob data of the given size blobData := make([]byte, blobSizeKB*1024) if _, err := cryptorand.Read(blobData); err != nil { - return "", err + return nil, err } namespace := share.RandomBlobNamespace() blob, err := share.NewV0Blob(namespace, blobData) if err != nil { - return "", err + return nil, err } currentHeight, err := getCurrentBlockHeight(ctx, grpcConn) if err != nil { - return "", err + return nil, err } - // Set timeout height to be between 1 and 5 blocks from the current height - timeoutHeight := currentHeight + int64(rand.Intn(5)) - resp, err := txClient.BroadcastPayForBlob(ctx, []*share.Blob{blob}, user.SetTimeoutHeight(uint64(timeoutHeight))) + // Set timeout height to be between 3 and 10 blocks from the current height + timeoutHeight := currentHeight + int64(rand.Intn(8)+3) + + // Submit via sequential queue - this handles signing, broadcasting, and confirmation + resp, err := txClient.SubmitPFBToSequentialQueue(ctx, []*share.Blob{blob}, user.SetTimeoutHeight(uint64(timeoutHeight))) if err != nil { - return "", err + return nil, err } - return resp.TxHash, nil + return resp, nil } // getCurrentBlockHeight gets the current block height from the chain @@ -176,7 +196,7 @@ func getCurrentBlockHeight(ctx context.Context, grpcConn *grpc.ClientConn) (int6 return resp.SdkBlock.Header.Height, nil } -func NewMochaTxClient(ctx context.Context, cfg Config) (*user.TxClient, *grpc.ClientConn, context.CancelFunc, error) { +func NewMochaTxClientV2(ctx context.Context, cfg Config) (*v2.TxClient, *grpc.ClientConn, func(), error) { encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) kr, err := keyring.New(app.Name, keyring.BackendTest, "~/.celestia-app", nil, encCfg.Codec) if err != nil { @@ -196,12 +216,22 @@ func NewMochaTxClient(ctx context.Context, cfg Config) (*user.TxClient, *grpc.Cl } clientCtx, cancel := context.WithCancel(ctx) - txClient, err := user.SetupTxClient(clientCtx, kr, grpcConn, encCfg) + + // Create v2 TxClient with sequential queue + txClient, err := v2.SetupTxClient(clientCtx, kr, grpcConn, encCfg) if err != nil { grpcConn.Close() cancel() - return nil, nil, nil, fmt.Errorf("failed to create tx client: %w", err) + return nil, nil, nil, fmt.Errorf("failed to create tx client v2: %w", err) + } + + cleanup := func() { + txClient.StopAllSequentialQueues() + grpcConn.Close() + cancel() } - return txClient, grpcConn, cancel, nil + log.Printf("Sequential queue started for account: %s", txClient.DefaultAccountName()) + + return txClient, grpcConn, cleanup, nil } diff --git a/tools/talis/docker/Dockerfile b/tools/talis/docker/Dockerfile index 5564c69589..6adf7be151 100644 --- a/tools/talis/docker/Dockerfile +++ b/tools/talis/docker/Dockerfile @@ -12,6 +12,6 @@ RUN go mod download COPY . . -RUN GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 go build -tags="ledger" -ldflags="${LDFLAGS}" -o /out/txsim ./test/cmd/txsim \ - && GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 GOOS=linux go build -tags="ledger" -ldflags="${LDFLAGS}" -o /out/celestia-appd ./cmd/celestia-appd \ - && GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 go build -tags="ledger" -ldflags="${LDFLAGS}" -o /out/latency-monitor ./tools/latency-monitor +# RUN GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 go build -tags="ledger" -ldflags="${LDFLAGS}" -o /out/txsim ./test/cmd/txsim \ +RUN GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 GOOS=linux go build -tags="ledger" -ldflags="${LDFLAGS}" -o /out/celestia-appd ./cmd/celestia-appd +# && GOOS=${GOOS} GOARCH=${GOARCH} CGO_ENABLED=0 go build -tags="ledger" -ldflags="${LDFLAGS}" -o /out/latency-monitor ./tools/latency-monitor