gRPC Golang Master Class Build Modern API & Microservices
lesson1
lesson2
lesson4
lesson5
lesson6
lesson7
lesson8
lesson9
lesson10
lesson11
go get -u google.golang.org/grpc
go get -u github.com/golang/protobuf/protoc-gen-go
lesson12
syntax = "proto3";
package greet;
option go_package="greetpb";
service GreetService{}
protoc greet/greetpb/greet.proto --go_out=plugins=grpc:.
syntax = "proto3";
package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";
service GreetService{}
lesson13
package main
import (
"fmt"
"log"
"net"
"rpc-learn/greet/greetpb"
"google.golang.org/grpc"
)
type server struct{}
func main() {
fmt.Println("Hello")
lis, err := net.Listen("tcp", "0.0.0.0:50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
// 调用grpc,注册服务
greetpb.RegisterGreetServiceServer(s, &server{})
// 开启服务器
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
go run greet/greet_server/server.go
lesson14
package main
import (
"fmt"
"log"
"rpc-learn/greet/greetpb"
"google.golang.org/grpc"
)
func main() {
fmt.Println("client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := greetpb.NewGreetServiceClient(conn)
fmt.Printf("Created client: %f", c)
}
lesson15
一元调用特点:
- 基本的请求响应,交换一条请求和响应
- 常用
- 适合数据量小的情况
lesson16
syntax = "proto3";
package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";
// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
string first_name = 1;
string last_name = 2;
}
message GreetRequest {
Greeting greeting = 1;
}
message GreetResponse {
string result = 1;
}
service GreetService{
// Unary
rpc Greet(GreetRequest) returns (GreetResponse) {};
}
lesson17
package main
import (
"context"
"fmt"
"log"
"net"
"rpc-learn/greet/greetpb"
"google.golang.org/grpc"
)
type server struct{}
func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
log.Printf("Greet function was invoked with %v\n", req)
firstName := req.GetGreeting().GetFirstName()
result := "Hello " + firstName
res := &greetpb.GreetResponse{
Result: result,
}
return res, nil
}
func main() {
// ...
}
lesson18
package main
import (
"context"
"fmt"
"log"
"rpc-learn/greet/greetpb"
"google.golang.org/grpc"
)
func main() {
fmt.Println("client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := greetpb.NewGreetServiceClient(conn)
// fmt.Printf("Created client: %f", c)
doUnary(c)
}
func doUnary(c greetpb.GreetServiceClient) {
log.Println("Starting to do a Unary RPC...")
req := &greetpb.GreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Stephane",
LastName: "Maarek",
},
}
res, err := c.Greet(context.Background(), req)
if err != nil {
log.Fatalf("error while calling Greet RPC: %v", err)
}
log.Printf("Response from Greet: %v", res.Result)
}
lesson19
syntax = "proto3";
package calculator;
option go_package = "./calculator/calculatorpb";
message SumRequest {
int32 first_number = 1;
int32 second_number = 2;
}
message SumResponse {
int32 sum_result = 1;
}
service CalculatorService {
rpc Sum(SumRequest) returns (SumResponse) {};
}
package main
import (
"context"
"fmt"
"log"
"net"
"rpc-learn/calculator/calculatorpb"
"google.golang.org/grpc"
)
type server struct{}
func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
log.Printf("Received Sum RPC: %v", req)
firstNumber := req.FirstNumber
secondNumber := req.SecondNumber
sum := firstNumber + secondNumber
res := &calculatorpb.SumResponse{
SumResult: sum,
}
return res, nil
}
func main() {
fmt.Println("Calculator Server")
lis, err := net.Listen("tcp", "0.0.0.0:50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
// 调用grpc,注册服务
calculatorpb.RegisterCalculatorServiceServer(s, &server{})
// 开启服务器
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
package main
import (
"context"
"fmt"
"log"
"rpc-learn/calculator/calculatorpb"
"google.golang.org/grpc"
)
func main() {
fmt.Println("Calculator Client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := calculatorpb.NewCalculatorServiceClient(conn)
// fmt.Printf("Created client: %f", c)
doUnary(c)
}
func doUnary(c calculatorpb.CalculatorServiceClient) {
log.Println("Starting to do a Sum Unary RPC...")
req := &calculatorpb.SumRequest{
FirstNumber: 56,
SecondNumber: 8,
}
res, err := c.Sum(context.Background(), req)
if err != nil {
log.Fatalf("error while calling Sum RPC: %v", err)
}
log.Printf("Response from Greet: %v", res.SumResult)
}
lesson20
流式传输 api
- 适用于大型数据
- 发送多个请求包
- 适用于推送消息
lesson21
syntax = "proto3";
package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";
// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
string first_name = 1;
string last_name = 2;
}
// ...
message GreetManyTimesRequest {
Greeting greeting = 1;
}
message GreetManyTimesResponse {
string result = 1;
}
service GreetService{
// Unary
rpc Greet(GreetRequest) returns (GreetResponse) {};
// Server Streaming
rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse) {};
}
lesson22
package main
import (
"context"
"fmt"
"log"
"net"
"rpc-learn/greet/greetpb"
"strconv"
"time"
"google.golang.org/grpc"
)
type server struct{}
func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
// ...
}
func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
log.Printf("GreetManyTimes function was invoked with %v\n", req)
firstName := req.GetGreeting().GetFirstName()
for i := 0; i < 10; i++ {
result := "Hello" + firstName + " number" + strconv.Itoa(i)
res := &greetpb.GreetManyTimesResponse{
Result: result,
}
stream.Send(res)
time.Sleep(1000 * time.Millisecond)
}
return nil
}
func main() {
// ...
}
lesson23
package main
import (
"context"
"fmt"
"io"
"log"
"rpc-learn/greet/greetpb"
"google.golang.org/grpc"
)
func main() {
fmt.Println("client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := greetpb.NewGreetServiceClient(conn)
// doUnary(c)
doServerStreaming(c)
}
func doUnary(c greetpb.GreetServiceClient) {
// ...
}
func doServerStreaming(c greetpb.GreetServiceClient) {
log.Println("Starting to do a Server Streaming RPC...")
req := &greetpb.GreetManyTimesRequest{
Greeting: &greetpb.Greeting{
FirstName: "Stephane",
LastName: "Maarek",
},
}
resStream, err := c.GreetManyTimes(context.Background(), req)
if err != nil {
log.Fatalf("error while calling GreetManyTimes RPC: %v", err)
}
for {
msg, err := resStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error while reading stream: %v", err)
}
log.Printf("Response from GreetManyTimes: %v", msg.GetResult())
}
}
lesson24
因式分解
syntax = "proto3";
package calculator;
option go_package = "./calculator/calculatorpb";
//...
message PrimeNumberDecompositionRequest {
int64 number = 1;
}
message PrimeNumberDecompositionResponse {
int64 prime_factor = 1;
}
service CalculatorService {
rpc Sum(SumRequest) returns (SumResponse) {};
rpc PrimeNumberDecomposition(PrimeNumberDecompositionRequest)
returns (stream PrimeNumberDecompositionResponse) {};
}
package main
import (
"context"
"fmt"
"log"
"net"
"rpc-learn/calculator/calculatorpb"
"google.golang.org/grpc"
)
type server struct{}
func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
// ...
}
func (*server) PrimeNumberDecomposition(
req *calculatorpb.PrimeNumberDecompositionRequest,
stream calculatorpb.CalculatorService_PrimeNumberDecompositionServer) error {
log.Printf("Received PrimeNumberDecomposition RPC: %v", req)
number := req.GetNumber()
divisor := int64(2)
for number > 1 {
if number%divisor == 0 {
stream.Send(&calculatorpb.PrimeNumberDecompositionResponse{
PrimeFactor: divisor,
})
number = number / divisor
} else {
divisor++
log.Printf("Divisor has increased to %v", divisor)
}
}
return nil
}
func main() {
// ...
}
package main
import (
"context"
"fmt"
"io"
"log"
"rpc-learn/calculator/calculatorpb"
"google.golang.org/grpc"
)
func main() {
fmt.Println("Calculator Client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := calculatorpb.NewCalculatorServiceClient(conn)
// fmt.Printf("Created client: %f", c)
// doUnary(c)
doServerStreaming(c)
}
func doUnary(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doServerStreaming(c calculatorpb.CalculatorServiceClient) {
log.Println("Starting to do a PrimeNumberDecomposition RPC...")
req := &calculatorpb.PrimeNumberDecompositionRequest{
Number: 12,
}
stream, err := c.PrimeNumberDecomposition(context.Background(), req)
if err != nil {
log.Fatalf("error while calling PrimeNumberDecomposition RPC: %v", err)
}
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error while reading stream: %v", err)
}
log.Println(res.GetPrimeFactor())
}
}
lesson25
客户端流 api
- 上传文件
- 同时处理多个数据
- 客户端向服务端推送
lesson26
syntax = "proto3";
package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";
// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
string first_name = 1;
string last_name = 2;
}
// ...
message LongGreetRequest {
Greeting greeting = 1;
}
message LongGreetResponse {
string result = 1;
}
service GreetService{
// Unary
rpc Greet(GreetRequest) returns (GreetResponse) {};
// Server Streaming
rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse) {};
// Client Streaming
rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {};
}
lesson27
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"rpc-learn/greet/greetpb"
"strconv"
"time"
"google.golang.org/grpc"
)
type server struct{}
func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
// ...
}
func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
// ...
}
func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error {
log.Println("LongGreet function was invoked with a streaming request")
result := ""
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&greetpb.LongGreetResponse{
Result: result,
})
}
if err != nil {
log.Fatalf("Error while reading client stream: %v", err)
}
firstName := req.GetGreeting().GetFirstName()
result += "Hello " + firstName + "! "
}
}
func main() {
fmt.Println("Hello")
lis, err := net.Listen("tcp", "0.0.0.0:50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
// 调用grpc,注册服务
greetpb.RegisterGreetServiceServer(s, &server{})
// 开启服务器
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
lesson28
package main
import (
"context"
"fmt"
"io"
"log"
"rpc-learn/greet/greetpb"
"time"
"google.golang.org/grpc"
)
func main() {
fmt.Println("client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := greetpb.NewGreetServiceClient(conn)
doClientStreaming(c)
}
func doUnary(c greetpb.GreetServiceClient) {
// ...
}
func doServerStreaming(c greetpb.GreetServiceClient) {
// ...
}
func doClientStreaming(c greetpb.GreetServiceClient) {
log.Println("Starting to do a Client Streaming RPC...")
requests := []*greetpb.LongGreetRequest{
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Stephane",
},
},
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "John",
},
},
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Lucy",
},
},
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Mark",
},
},
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Piper",
},
},
}
stream, err := c.LongGreet(context.Background())
if err != nil {
log.Fatalf("error while calling LongGreet: %v", err)
}
for _, req := range requests {
log.Printf("Sending req: %v\n", req)
stream.Send(req)
time.Sleep(100 * time.Millisecond)
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("error while receiving response from LongGreet: %v", err)
}
log.Println("LongGreet Response:", res)
}
lesson29
syntax = "proto3";
package calculator;
option go_package = "./calculator/calculatorpb";
// ...
message ComputeAverageRequest {
int32 number = 1;
}
message ComputeAverageResponse {
double average = 1;
}
service CalculatorService {
// ...
rpc ComputeAverage(stream ComputeAverageRequest) returns (ComputeAverageResponse) {};
}
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"rpc-learn/calculator/calculatorpb"
"google.golang.org/grpc"
)
type server struct{}
func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
// ...
}
func (*server) PrimeNumberDecomposition(
req *calculatorpb.PrimeNumberDecompositionRequest,
stream calculatorpb.CalculatorService_PrimeNumberDecompositionServer) error {
// ...
}
func (*server) ComputeAverage(
stream calculatorpb.CalculatorService_ComputeAverageServer) error {
log.Println("Received ComputeAverage RPC")
sum := int32(0)
count := 0
for {
req, err := stream.Recv()
if err == io.EOF {
average := float64(sum) / float64(count)
return stream.SendAndClose(&calculatorpb.ComputeAverageResponse{
Average: average,
})
}
if err != nil {
log.Fatalf("Error while reading client stream: %v", err)
}
sum += req.GetNumber()
count++
}
}
func main() {
// ...
}
package main
import (
"context"
"fmt"
"io"
"log"
"rpc-learn/calculator/calculatorpb"
"google.golang.org/grpc"
)
func main() {
fmt.Println("Calculator Client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := calculatorpb.NewCalculatorServiceClient(conn)
doClientStreaming(c)
}
func doUnary(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doServerStreaming(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doClientStreaming(c calculatorpb.CalculatorServiceClient) {
log.Println("Starting to do a ComputeAverage Client Streaming RPC...")
stream, err := c.ComputeAverage(context.Background())
if err != nil {
log.Fatalf("Error while opening stream: %v", err)
}
numbers := []int32{3, 5, 9, 54, 23}
for _, number := range numbers {
log.Printf("Sending number: %v\n", number)
stream.Send(&calculatorpb.ComputeAverageRequest{
Number: number,
})
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("Error while receiving response: %v", err)
}
log.Println("The Average is:", res.GetAverage())
}
lesson30
lesson31
syntax = "proto3";
package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";
// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
string first_name = 1;
string last_name = 2;
}
// ...
message GreetEveryoneRequest {
Greeting greeting = 1;
}
message GreetEveryoneResponse {
string result = 1;
}
service GreetService{
// Unary
rpc Greet(GreetRequest) returns (GreetResponse) {};
// Server Streaming
rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse) {};
// Client Streaming
rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {};
// BiDi Streaming
rpc GreetEveryone(stream GreetEveryoneRequest) returns (stream GreetEveryoneResponse) {};
}
lesson32
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"rpc-learn/greet/greetpb"
"strconv"
"time"
"google.golang.org/grpc"
)
type server struct{}
func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
// ...
}
func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
// ...
}
func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error {
// ...
}
func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error {
log.Println("GreetEveryone function was invoked with a streaming request")
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
log.Fatalf("Error while reading client stream: %v", err)
return err
}
firstName := req.Greeting.FirstName
result := "Hello " + firstName + "! "
err = stream.Send(&greetpb.GreetEveryoneResponse{
Result: result,
})
if err != nil {
log.Fatalf("Error while sending data to client: %v", err)
return err
}
}
}
func main() {
// ...
}
lesson33
package main
import (
"context"
"fmt"
"io"
"log"
"rpc-learn/greet/greetpb"
"time"
"google.golang.org/grpc"
)
func main() {
fmt.Println("client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := greetpb.NewGreetServiceClient(conn)
doBiDiStreaming(c)
}
func doUnary(c greetpb.GreetServiceClient) {
// ...
}
func doServerStreaming(c greetpb.GreetServiceClient) {
// ...
}
func doClientStreaming(c greetpb.GreetServiceClient) {
// ...
}
func doBiDiStreaming(c greetpb.GreetServiceClient) {
log.Println("Starting to do a BiDi Streaming RPC...")
requests := []*greetpb.GreetEveryoneRequest{
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "Stephane",
},
},
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "John",
},
},
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "Lucy",
},
},
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "Mark",
},
},
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "Piper",
},
},
}
// we create a stream by invoking the client
stream, err := c.GreetEveryone(context.Background())
if err != nil {
log.Fatalf("error while creating stream:", err)
return
}
watic := make(chan struct{})
// we send a bunch of message to the client (go routine)
go func() {
// function to send a bunch of message
for _, req := range requests {
log.Println("Sending message:", req)
stream.Send(req)
time.Sleep(1000 * time.Millisecond)
}
stream.CloseSend()
}()
// we receive a bunch of messages from the client (go routine)
go func() {
// function to receive a bunch of message
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error while receving: %v", err)
break
}
log.Println("Received:", res.GetResult())
}
close(watic)
}()
// block until everything is done
<-watic
}
lesson34
syntax = "proto3";
package calculator;
option go_package = "./calculator/calculatorpb";
// ...
message FindMaxiumRequest {
int32 number = 1;
}
message FindMaxiumResponse {
int32 maxium = 1;
}
service CalculatorService {
// ...
rpc FindMaxium(stream FindMaxiumRequest) returns (stream FindMaxiumResponse) {};
}
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"rpc-learn/calculator/calculatorpb"
"google.golang.org/grpc"
)
type server struct{}
func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
// ...
}
func (*server) PrimeNumberDecomposition(
req *calculatorpb.PrimeNumberDecompositionRequest,
stream calculatorpb.CalculatorService_PrimeNumberDecompositionServer) error {
// ...
}
func (*server) ComputeAverage(
stream calculatorpb.CalculatorService_ComputeAverageServer) error {
// ...
}
func (*server) FindMaxium(stream calculatorpb.CalculatorService_FindMaxiumServer) error {
log.Println("Received FindMaxium RPC")
maxium := int32(0)
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
log.Fatalf("Error while reading client stream: %v", err)
return err
}
number := req.GetNumber()
if number > maxium {
maxium = number
err = stream.Send(&calculatorpb.FindMaxiumResponse{
Maxium: maxium,
})
if err != nil {
log.Fatalf("Error while sending client stream: %v", err)
return err
}
}
}
}
func main() {
// ...
}
package main
import (
"context"
"fmt"
"io"
"log"
"rpc-learn/calculator/calculatorpb"
"time"
"google.golang.org/grpc"
)
func main() {
fmt.Println("Calculator Client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := calculatorpb.NewCalculatorServiceClient(conn)
doBiDiStreaming(c)
}
func doUnary(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doServerStreaming(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doClientStreaming(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doBiDiStreaming(c calculatorpb.CalculatorServiceClient) {
log.Println("Starting to do a FindMaxium BiDi Streaming RPC...")
stream, err := c.FindMaxium(context.Background())
if err != nil {
log.Fatalf("Error while opening stream and calling FindMaxium: %v", err)
}
waitc := make(chan struct{})
// send go routine
go func() {
numbers := []int32{4, 7, 2, 19, 4, 6, 32}
for _, number := range numbers {
log.Println("Sending number:", number)
stream.Send(&calculatorpb.FindMaxiumRequest{
Number: number,
})
time.Sleep(1000 * time.Millisecond)
}
stream.CloseSend()
}()
// receive go routine
go func() {
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Problem while reading server stream: %v", err)
break
}
maxium := res.GetMaxium()
log.Println("Received a new maxium of...:", maxium)
}
close(waitc)
}()
<-waitc
}
lesson35
lesson36 – Error
syntax = "proto3";
package calculator;
option go_package = "./calculator/calculatorpb";
// ...
message SquareRootRequest {
int32 number = 1;
}
message SquareRootResponse {
double number_root = 1;
}
service CalculatorService {
// ...
// error handling
// this RPC will throw an exception if the send number is negative
// The error being send it of type INVALID_ARGUMENT
rpc SquareRoot(SquareRootRequest) returns (SquareRootResponse) {};
}
package main
import (
"context"
"fmt"
"io"
"log"
"math"
"net"
"rpc-learn/calculator/calculatorpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type server struct{}
func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
// ...
}
func (*server) PrimeNumberDecomposition(
req *calculatorpb.PrimeNumberDecompositionRequest,
stream calculatorpb.CalculatorService_PrimeNumberDecompositionServer) error {
// ...
}
func (*server) ComputeAverage(
stream calculatorpb.CalculatorService_ComputeAverageServer) error {
//...
}
func (*server) FindMaxium(stream calculatorpb.CalculatorService_FindMaxiumServer) error {
// ...
}
func (*server) SquareRoot(
ctx context.Context, req *calculatorpb.SquareRootRequest) (
*calculatorpb.SquareRootResponse, error) {
log.Println("Received SquareRoot RPC")
number := req.GetNumber()
if number < 0 {
return nil, status.Errorf(
codes.InvalidArgument,
fmt.Sprintf("Received a negative number: %v", number),
)
}
return &calculatorpb.SquareRootResponse{
NumberRoot: math.Sqrt(float64(number)),
}, nil
}
func main() {
// ...
}
package main
import (
"context"
"fmt"
"io"
"log"
"rpc-learn/calculator/calculatorpb"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func main() {
fmt.Println("Calculator Client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := calculatorpb.NewCalculatorServiceClient(conn)
doErrorUnary(c)
}
func doUnary(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doServerStreaming(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doClientStreaming(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doBiDiStreaming(c calculatorpb.CalculatorServiceClient) {
// ...
}
func doErrorUnary(c calculatorpb.CalculatorServiceClient) {
log.Println("Starting to be a SquareRoot Unary RPC...")
doErrorCall(c, 10)
doErrorCall(c, -2)
}
func doErrorCall(c calculatorpb.CalculatorServiceClient, n int32) {
res, err := c.SquareRoot(context.Background(), &calculatorpb.SquareRootRequest{Number: n})
if err != nil {
respErr, ok := status.FromError(err)
if ok {
// actual error from grpc (user error)
log.Println("Error message from server:", respErr.Message())
log.Println(respErr.Code())
if respErr.Code() == codes.InvalidArgument {
log.Println("We probably sent a negative number!")
return
}
} else {
log.Fatalf("Big Error calling SquareRoot: %v", err)
return
}
}
log.Println("Result of square root of %v: %v", n, res.GetNumberRoot())
}
lesson37 – Deadline
lesson38
syntax = "proto3";
package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";
// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
string first_name = 1;
string last_name = 2;
}
// ...
message GreetWithDeadlineRequest {
Greeting greeting = 1;
}
message GreetWithDeadlineResponse {
string result = 1;
}
service GreetService{
// ...
// Unary With Deadline
rpc GreetWithDeadline(GreetWithDeadlineRequest) returns (GreetWithDeadlineResponse) {};
}
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"rpc-learn/greet/greetpb"
"strconv"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type server struct{}
func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
// ...
}
func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
// ...
}
func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error {
// ...
}
func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error {
// ...
}
func (*server) GreetWithDeadline(
ctx context.Context, req *greetpb.GreetWithDeadlineRequest) (
*greetpb.GreetWithDeadlineResponse, error) {
log.Printf("GreetWithDeadline function was invoked with %v\n", req)
for i := 0; i < 3; i++ {
if ctx.Err() == context.Canceled {
// the client canceled the request
log.Println("The client canceled the request!")
return nil, status.Error(codes.DeadlineExceeded, "the client canceled the request")
}
time.Sleep(1 * time.Second)
}
firstName := req.GetGreeting().GetFirstName()
result := "Hello " + firstName
res := &greetpb.GreetWithDeadlineResponse{
Result: result,
}
return res, nil
}
func main() {
// ...
}
package main
import (
"context"
"fmt"
"io"
"log"
"rpc-learn/greet/greetpb"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func main() {
fmt.Println("client")
// WithInsecure: 不安全连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := greetpb.NewGreetServiceClient(conn)
doUnaryWithDetail(c, 5*time.Second)
doUnaryWithDetail(c, 1*time.Second)
}
func doUnary(c greetpb.GreetServiceClient) {
// ...
}
func doServerStreaming(c greetpb.GreetServiceClient) {
// ...
}
func doClientStreaming(c greetpb.GreetServiceClient) {
// ...
}
func doBiDiStreaming(c greetpb.GreetServiceClient) {
// ...
}
func doUnaryWithDetail(c greetpb.GreetServiceClient, timeout time.Duration) {
log.Println("Starting to do a UnaryWithDetail RPC...")
req := &greetpb.GreetWithDeadlineRequest{
Greeting: &greetpb.Greeting{
FirstName: "Stephane",
LastName: "Maarek",
},
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() // 超时,退出
res, err := c.GreetWithDeadline(ctx, req)
if err != nil {
statusErr, ok := status.FromError(err)
if ok {
if statusErr.Code() == codes.DeadlineExceeded {
log.Println("Timeout was hit! Deadline was exceeded")
} else {
log.Println("error while calling GreetWithDeadline RPC:", err)
}
} else {
log.Fatalf("error while calling GreetWithDetail RPC: %v", err)
}
return
}
log.Printf("Response from GreetWithDetail: %v", res.Result)
}
lesson39 – SSL
lesson40
#!/bin/bash
SERVER_CN=localhost
openssl genrsa -passout pass:1111 -des3 -out ca.key 4096
openssl req -passin pass:1111 -new -x509 -days 365 -key ca.key -out ca.crt -subj "/CN=${SERVER_CN}"
openssl genrsa -passout pass:1111 -des3 -out server.key 4096
openssl req -passin pass:1111 -new -key server.key -out server.csr -subj "/CN=${SERVER_CN}"
openssl x509 -req -passin pass:1111 -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt
openssl pkcs8 -topk8 -nocrypt -passin pass:1111 -in server.key -out server.pem
用不了,跳
lesson41
java 的 grpc 出问题,不搞了
lesson42–reflection&cli
import "google.golang.org/grpc/reflection"
evans -p 50051 -r
show package
show service
show message
# 展示message详细信息
desc SumRequest
# 切换到包下
package calculator
# 切换到服务
service CalculatorService
# 调用服务
call Sum
lesson43–grpc with MongoDB
lesson44
lesson45
syntax = "proto3";
package blog;
option go_package = "./blog/blogpb";
message Blog {
string id = 1;
string author_id = 2;
string title = 3;
string content = 4;
}
service BlogService {
}
package main
import (
"fmt"
"os"
"os/signal"
"rpc-learn/blog/blogpb"
"log"
"net"
"google.golang.org/grpc"
)
type server struct{}
func main() {
// if we crash the go code, we get the file name and line number
log.SetFlags(log.LstdFlags | log.Lshortfile)
fmt.Println("Blog server started")
lis, err := net.Listen("tcp", "localhost:50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{}
s := grpc.NewServer(opts...)
// 调用grpc,注册服务
blogpb.RegisterBlogServiceServer(s, &server{})
go func() {
// 开启服务器
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// wait for ctrl-c to exit
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
// block until a signal is received
<-ch
log.Println("Stopping the server")
s.Stop()
log.Println("Stopping the listener")
lis.Close()
log.Println("End of Program")
}
lesson46
package main
import (
"context"
"fmt"
"os"
"os/signal"
"rpc-learn/blog/blogpb"
// "io"
"log"
"net"
// "strconv"
// "time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"google.golang.org/grpc"
// "google.golang.org/grpc/codes"
// "google.golang.org/grpc/status"
)
type server struct{}
type blogItem struct {
ID string `bson:"_id,omitempty"`
AuthorID string `bson:"author_id`
Content string `bson:"content"`
Title string `bson:"title"`
}
func main() {
// if we crash the go code, we get the file name and line number
log.SetFlags(log.LstdFlags | log.Lshortfile)
fmt.Println("Blog server started")
serverAPI := options.ServerAPI(options.ServerAPIVersion1)
mongoOpts := options.Client().ApplyURI("mongodb://localhost:27017").SetServerAPIOptions(serverAPI)
client, err := mongo.Connect(context.TODO(), mongoOpts)
if err != nil {
{
panic(err)
}
}
// defer func() {
// if err = client.Disconnect(context.TODO()); err != nil {
// panic(err)
// }
// }()
var result bson.M
if err := client.Database("admin").RunCommand(context.TODO(), bson.D{{"ping", 1}}).Decode(&result); err != nil {
panic(err)
}
fmt.Println("Pinged your deployment. You successfully connected to MongoDB!")
lis, err := net.Listen("tcp", "localhost:50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{}
s := grpc.NewServer(opts...)
// 调用grpc,注册服务
blogpb.RegisterBlogServiceServer(s, &server{})
go func() {
// 开启服务器
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// wait for ctrl-c to exit
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
// block until a signal is received
<-ch
log.Println("Stopping the server")
s.Stop()
log.Println("Stopping the listener")
lis.Close()
log.Println("Closing MongoDB Connection")
client.Disconnect(context.TODO())
log.Println("End of Program")
}
lesson47
package main
import (
"context"
"fmt"
"os"
"os/signal"
"rpc-learn/blog/blogpb"
// "io"
"log"
"net"
// "strconv"
// "time"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type blogItem struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
AuthorID string `bson:"author_id`
Content string `bson:"content"`
Title string `bson:"title"`
}
var coll *mongo.Collection
type server struct{}
func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
blog := req.GetBlog()
data := blogItem{
AuthorID: blog.GetAuthorId(),
Title: blog.GetTitle(),
Content: blog.GetContent(),
}
res, err := coll.InsertOne(context.Background(), data)
if err != nil {
return nil, status.Errorf(codes.Internal,
fmt.Sprintf("Internal error: %v", err))
}
oid,ok:= res.InsertedID.(primitive.ObjectID)
if !ok{
return nil , status.Errorf(codes.Internal,
fmt.Sprintf("Cannot convert to OID"))
}
return &blogpb.CreateBlogResponse{
Blog: &blogpb.Blog{
Id: oid.Hex(),
AuthorId: blog.GetAuthorId(),
Title: blog.GetTitle(),
Content: blog.GetContent(),
},
}, nil
}
func main() {
// if we crash the go code, we get the file name and line number
log.SetFlags(log.LstdFlags | log.Lshortfile)
fmt.Println("Blog server started")
serverAPI := options.ServerAPI(options.ServerAPIVersion1)
mongoOpts := options.Client().ApplyURI("mongodb://localhost:27017").SetServerAPIOptions(serverAPI)
client, err := mongo.Connect(context.TODO(), mongoOpts)
if err != nil {
{
panic(err)
}
}
// defer func() {
// if err = client.Disconnect(context.TODO()); err != nil {
// panic(err)
// }
// }()
coll = client.Database("sample_mflix").Collection("blogs")
fmt.Println("Pinged your deployment. You successfully connected to MongoDB!")
lis, err := net.Listen("tcp", "localhost:50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{}
s := grpc.NewServer(opts...)
// 调用grpc,注册服务
blogpb.RegisterBlogServiceServer(s, &server{})
go func() {
// 开启服务器
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// wait for ctrl-c to exit
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
// block until a signal is received
<-ch
log.Println("Stopping the server")
s.Stop()
log.Println("Stopping the listener")
lis.Close()
log.Println("Closing MongoDB Connection")
client.Disconnect(context.TODO())
log.Println("End of Program")
}
lesson48
package main
import (
"context"
"fmt"
// "io"
"log"
"rpc-learn/blog/blogpb"
// "time"
"google.golang.org/grpc"
// "google.golang.org/grpc/codes"
// "google.golang.org/grpc/credentials"
// "google.golang.org/grpc/status"
)
func main() {
fmt.Println("blog client")
opts := grpc.WithInsecure()
conn, err := grpc.Dial("localhost:50051", opts)
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := blogpb.NewBlogServiceClient(conn)
log.Println("Creating the blog")
blog := &blogpb.Blog{
AuthorId: "Stephane",
Title: "My First Blog",
Content: "Content of the first blog",
}
res, err := c.CreateBlog(context.Background(), &blogpb.CreateBlogRequest{
Blog: blog,
})
if err != nil {
log.Fatalf("Unexpected error creating blog: %v", err)
}
log.Println("Blog has been created:", res)
}
lesson49
syntax = "proto3";
package blog;
option go_package = "./blog/blogpb";
message Blog {
string id = 1;
string author_id = 2;
string title = 3;
string content = 4;
}
// ...
message ReadBlogRequest {
string blog_id = 1;
}
message ReadBlogResponse {
Blog blog = 1; // will have a blog id
}
service BlogService {
rpc CreateBlog (CreateBlogRequest) returns (CreateBlogResponse);
// return NOT FOUND if not found
rpc ReadBlog (ReadBlogRequest) returns (ReadBlogResponse);
}
package main
import (
"context"
"fmt"
"os"
"os/signal"
"rpc-learn/blog/blogpb"
// "io"
"log"
"net"
// "strconv"
// "time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type blogItem struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
AuthorID string `bson:"author_id`
Content string `bson:"content"`
Title string `bson:"title"`
}
var coll *mongo.Collection
type server struct{}
func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
// ...
}
func (*server) ReadBlog(ctx context.Context, req *blogpb.ReadBlogRequest) (*blogpb.ReadBlogResponse, error) {
log.Println("Read blog request")
blogId := req.GetBlogId()
oid, err := primitive.ObjectIDFromHex(blogId)
if err != nil {
return nil, status.Errorf(
codes.InvalidArgument,
fmt.Sprintf("Cannot parse ID"))
}
// create a empty struct
data := &blogItem{}
filter := bson.D{{"_id", oid}}
res := coll.FindOne(context.Background(), filter)
if err := res.Decode(data); err != nil {
return nil, status.Errorf(
codes.NotFound,
fmt.Sprintf("Cannot find blog with specified ID %v", err))
}
return &blogpb.ReadBlogResponse{
Blog: &blogpb.Blog{
Id: data.ID.Hex(),
AuthorId: data.AuthorID,
Content: data.Content,
Title: data.Title,
},
}, nil
}
func main() {
// ...
}
lesson50
package main
import (
"context"
"fmt"
"log"
"rpc-learn/blog/blogpb"
"google.golang.org/grpc"
)
func main() {
fmt.Println("blog client")
opts := grpc.WithInsecure()
conn, err := grpc.Dial("localhost:50051", opts)
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := blogpb.NewBlogServiceClient(conn)
// create
// blog := &blogpb.Blog{
// AuthorId: "Stephane",
// Title: "My First Blog",
// Content: "Content of the first blog",
// }
// create(c, blog)
// find
findOne(c, "64a5f45f94466f3e02f74f79")
}
func create(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
log.Println("Creating the blog")
res, err := c.CreateBlog(context.Background(), &blogpb.CreateBlogRequest{
Blog: blog,
})
if err != nil {
log.Fatalf("Unexpected error creating blog: %v", err)
}
log.Println("Blog has been created:", res)
}
func findOne(c blogpb.BlogServiceClient, id string) {
log.Println("Reading the blog")
res, err := c.ReadBlog(context.Background(), &blogpb.ReadBlogRequest{
BlogId: id,
})
if err != nil {
log.Fatalf("Unexpected error reading blog: %v", err)
}
log.Println("Blog has been read:", res)
}
lesson51
syntax = "proto3";
package blog;
option go_package = "./blog/blogpb";
message Blog {
string id = 1;
string author_id = 2;
string title = 3;
string content = 4;
}
// ...
message UpdateBlogRequest {
Blog blog = 1;
}
message UpdateBlogResponse {
Blog blog = 1;
}
service BlogService {
rpc CreateBlog (CreateBlogRequest) returns (CreateBlogResponse);
// return NOT FOUND if not found
rpc ReadBlog (ReadBlogRequest) returns (ReadBlogResponse);
rpc UpdateBlog (UpdateBlogRequest) returns (UpdateBlogResponse);
}
package main
import (
// ...
)
type blogItem struct {
// ...
}
var coll *mongo.Collection
type server struct{}
func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
// ...
}
func (*server) ReadBlog(ctx context.Context, req *blogpb.ReadBlogRequest) (*blogpb.ReadBlogResponse, error) {
// ...
}
func (*server) UpdateBlog(
ctx context.Context, req *blogpb.UpdateBlogRequest,
) (
*blogpb.UpdateBlogResponse, error,
) {
log.Println("Updating blog request")
blog := req.GetBlog()
oid, err := primitive.ObjectIDFromHex(blog.GetId())
if err != nil {
return nil, status.Errorf(
codes.InvalidArgument,
fmt.Sprintf("Cannot parse ID"))
}
// create a empty struct
data := &blogItem{}
filter := bson.D{{"_id", oid}}
res := coll.FindOne(context.Background(), filter)
if err := res.Decode(data); err != nil {
return nil, status.Errorf(
codes.NotFound,
fmt.Sprintf("Cannot find blog with specified ID %v", err))
}
data.AuthorID = blog.GetAuthorId()
data.Content = blog.GetContent()
data.Title = blog.GetTitle()
_, updateErr := coll.ReplaceOne(context.Background(), filter, data)
if updateErr != nil {
return nil, status.Errorf(
codes.Internal,
fmt.Sprintf("Cannot update object:%v", updateErr),
)
}
return &blogpb.UpdateBlogResponse{
Blog: dataToBlogpb(data),
}, nil
}
func dataToBlogpb(data *blogItem) *blogpb.Blog {
return &blogpb.Blog{
Id: data.ID.Hex(),
AuthorId: data.AuthorID,
Content: data.Content,
Title: data.Title,
}
}
func main() {
// ...
}
lesson52
package main
import (
// ...
)
func main() {
fmt.Println("blog client")
opts := grpc.WithInsecure()
conn, err := grpc.Dial("localhost:50051", opts)
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := blogpb.NewBlogServiceClient(conn)
// update
blog := &blogpb.Blog{
Id: "64a5f45f94466f3e02f74f79",
AuthorId: "changer",
Title: "My change Blog",
Content: "Content of the change blog",
}
update(c, blog)
}
func create(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
// ...
}
func findOne(c blogpb.BlogServiceClient, id string) {
// ...
}
func update(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
log.Println("Updating the blog")
uptRes, uptErr := c.UpdateBlog(context.Background(), &blogpb.UpdateBlogRequest{
Blog: blog,
})
if uptErr != nil {
log.Fatalf("Error while updating: %v", uptErr)
}
log.Println("Blog was read:", uptRes)
}
lesson53
syntax = "proto3";
package blog;
option go_package = "./blog/blogpb";
message Blog {
string id = 1;
string author_id = 2;
string title = 3;
string content = 4;
}
// ...
message DeleteBlogRequest {
string blog_id = 1;
}
message DeleteBlogResponse {
string blog_id = 1;
}
service BlogService {
rpc CreateBlog (CreateBlogRequest) returns (CreateBlogResponse);
// return NOT FOUND if not found
rpc ReadBlog (ReadBlogRequest) returns (ReadBlogResponse);
rpc UpdateBlog (UpdateBlogRequest) returns (UpdateBlogResponse);
rpc DeleteBlog (DeleteBlogRequest) returns (DeleteBlogResponse);
}
package main
import (
// ...
)
type blogItem struct {
// ...
}
var coll *mongo.Collection
type server struct{}
func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
// ...
}
func (*server) ReadBlog(ctx context.Context, req *blogpb.ReadBlogRequest) (*blogpb.ReadBlogResponse, error) {
// ...
}
func (*server) UpdateBlog(
ctx context.Context, req *blogpb.UpdateBlogRequest,
) (
*blogpb.UpdateBlogResponse, error,
) {
// ...
}
func (*server) DeleteBlog(
ctx context.Context, req *blogpb.DeleteBlogRequest,
) (
*blogpb.DeleteBlogResponse, error,
) {
log.Println("Deleting blog request")
oid, err := primitive.ObjectIDFromHex(req.GetBlogId())
if err != nil {
return nil, status.Errorf(
codes.InvalidArgument,
fmt.Sprintf("Cannot parse ID"),
)
}
filter := bson.D{{"_id", oid}}
res, err := coll.DeleteOne(context.Background(), filter)
if err != nil {
return nil, status.Errorf(
codes.Internal,
fmt.Sprintf("Cannot delete object: %v", err),
)
}
if res.DeletedCount == 0 {
return nil, status.Errorf(
codes.Internal,
fmt.Sprintf("Cannot find object: %v", err),
)
}
return &blogpb.DeleteBlogResponse{BlogId: req.GetBlogId()}, nil
}
func dataToBlogpb(data *blogItem) *blogpb.Blog {
// ...
}
func main() {
// ...
}
lesson54
package main
import (
// ...
)
func main() {
fmt.Println("blog client")
opts := grpc.WithInsecure()
conn, err := grpc.Dial("localhost:50051", opts)
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := blogpb.NewBlogServiceClient(conn)
delete(c, "64a627df7303d9f27d4f19a8")
}
func create(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
// ...
}
func findOne(c blogpb.BlogServiceClient, id string) {
// ...
}
func update(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
// ...
}
func delete(c blogpb.BlogServiceClient, id string) {
log.Println("Deleting the blog")
delRes, delErr := c.DeleteBlog(context.Background(), &blogpb.DeleteBlogRequest{
BlogId: id,
})
if delErr != nil {
log.Fatalf("Error while deleting: %v", delErr)
}
log.Println("Blog was read:", delRes)
}
lesson55
syntax = "proto3";
package blog;
option go_package = "./blog/blogpb";
message Blog {
string id = 1;
string author_id = 2;
string title = 3;
string content = 4;
}
// ...
message ListBlogRequest {
}
message ListBlogResponse {
Blog blog = 1;
}
service BlogService {
rpc CreateBlog (CreateBlogRequest) returns (CreateBlogResponse);
// return NOT FOUND if not found
rpc ReadBlog (ReadBlogRequest) returns (ReadBlogResponse);
rpc UpdateBlog (UpdateBlogRequest) returns (UpdateBlogResponse);
rpc DeleteBlog (DeleteBlogRequest) returns (DeleteBlogResponse);
rpc ListBlog (ListBlogRequest) returns (stream ListBlogResponse);
}
package main
import (
// ...
)
type blogItem struct {
// ...
}
var coll *mongo.Collection
type server struct{}
func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
// ...
}
func (*server) ReadBlog(ctx context.Context, req *blogpb.ReadBlogRequest) (*blogpb.ReadBlogResponse, error) {
// ...
}
func (*server) UpdateBlog(
ctx context.Context, req *blogpb.UpdateBlogRequest,
) (
*blogpb.UpdateBlogResponse, error,
) {
// ...
}
func (*server) DeleteBlog(
ctx context.Context, req *blogpb.DeleteBlogRequest,
) (
*blogpb.DeleteBlogResponse, error,
) {
// ...
}
func (*server) ListBlog(
req *blogpb.ListBlogRequest, stream blogpb.BlogService_ListBlogServer,
) error {
log.Println("List blog request")
// 查询所有要给一个空的filter
cur, err := coll.Find(context.Background(), bson.D{})
if err != nil {
return status.Errorf(
codes.Internal,
fmt.Sprintf("Unknown internal error: %v", err),
)
}
defer cur.Close(context.Background())
for cur.Next(context.Background()) {
data := &blogItem{}
err := cur.Decode(data)
if err != nil {
return status.Errorf(
codes.Internal,
fmt.Sprintf("Error while decoding data: %v", err),
)
}
stream.Send(&blogpb.ListBlogResponse{
Blog: dataToBlogpb(data),
})
}
if err := cur.Err(); err != nil {
return status.Errorf(
codes.Internal,
fmt.Sprintf("Unknown internal error: %v", err),
)
}
return nil
}
func dataToBlogpb(data *blogItem) *blogpb.Blog {
// ...
}
func main() {
// ...
}
lesson56
package main
import (
// ...
)
func main() {
fmt.Println("blog client")
opts := grpc.WithInsecure()
conn, err := grpc.Dial("localhost:50051", opts)
if err != nil {
log.Fatalf("could not connect: %v", err)
}
defer conn.Close()
c := blogpb.NewBlogServiceClient(conn)
// list
list(c)
}
func create(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
// ...
}
func findOne(c blogpb.BlogServiceClient, id string) {
// ...
}
func update(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
// ...
}
func delete(c blogpb.BlogServiceClient, id string) {
// ...
}
func list(c blogpb.BlogServiceClient) {
stream, err := c.ListBlog(context.Background(), &blogpb.ListBlogRequest{})
if err != nil {
log.Fatalf("error while calling ListBlog RPC: %v", err)
}
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error while reading ListBlog RPC: %v", err)
}
log.Println(res.GetBlog())
}
}