gRPC Golang Master Class Build Modern API & Microservicesz

gRPC Golang Master Class Build Modern API & Microservices

lesson1













lesson2





lesson4








lesson5









lesson6



lesson7

lesson8

lesson9


lesson10

lesson11

go get -u google.golang.org/grpc
go get -u github.com/golang/protobuf/protoc-gen-go

lesson12

syntax = "proto3";

package greet;
option go_package="greetpb";

service GreetService{}
protoc greet/greetpb/greet.proto --go_out=plugins=grpc:.

出现这个问题,要修改option go_package里的值,要用./开头

syntax = "proto3";

package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";

service GreetService{}

lesson13

package main

import (
    "fmt"
    "log"
    "net"
    "rpc-learn/greet/greetpb"

    "google.golang.org/grpc"
)

type server struct{}

func main() {
    fmt.Println("Hello")

    lis, err := net.Listen("tcp", "0.0.0.0:50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    s := grpc.NewServer()
    // 调用grpc,注册服务
    greetpb.RegisterGreetServiceServer(s, &server{})

    // 开启服务器
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
go run greet/greet_server/server.go

lesson14

package main

import (
    "fmt"
    "log"
    "rpc-learn/greet/greetpb"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := greetpb.NewGreetServiceClient(conn)
    fmt.Printf("Created client: %f", c)
}

lesson15

一元调用

一元调用特点:

  • 基本的请求响应,交换一条请求和响应
  • 常用
  • 适合数据量小的情况


lesson16

syntax = "proto3";

package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";

// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
    string first_name = 1;
    string last_name = 2;
}

message GreetRequest {
    Greeting greeting = 1;
}

message GreetResponse {
    string result = 1;
}

service GreetService{
    // Unary
    rpc Greet(GreetRequest) returns (GreetResponse) {};
}

lesson17


在生成的文件里找到接口,这个是服务端要实现的方法

package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "rpc-learn/greet/greetpb"

    "google.golang.org/grpc"
)

type server struct{}

func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
    log.Printf("Greet function was invoked with %v\n", req)
    firstName := req.GetGreeting().GetFirstName()
    result := "Hello " + firstName
    res := &greetpb.GreetResponse{
        Result: result,
    }
    return res, nil
}

func main() {
    // ...
}

lesson18

从生成的文件里找到客户端需要实现的方法

package main

import (
    "context"
    "fmt"
    "log"
    "rpc-learn/greet/greetpb"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := greetpb.NewGreetServiceClient(conn)
    // fmt.Printf("Created client: %f", c)

    doUnary(c)
}

func doUnary(c greetpb.GreetServiceClient) {
    log.Println("Starting to do a Unary RPC...")
    req := &greetpb.GreetRequest{
        Greeting: &greetpb.Greeting{
            FirstName: "Stephane",
            LastName:  "Maarek",
        },
    }

    res, err := c.Greet(context.Background(), req)
    if err != nil {
        log.Fatalf("error while calling Greet RPC: %v", err)
    }
    log.Printf("Response from Greet: %v", res.Result)
}


lesson19

syntax = "proto3";

package calculator;
option go_package = "./calculator/calculatorpb";

message SumRequest {
    int32 first_number = 1;
    int32 second_number = 2;
}

message SumResponse {
    int32 sum_result = 1;
}

service CalculatorService {
    rpc Sum(SumRequest) returns (SumResponse) {};
}
package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "rpc-learn/calculator/calculatorpb"

    "google.golang.org/grpc"
)

type server struct{}

func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
    log.Printf("Received Sum RPC: %v", req)
    firstNumber := req.FirstNumber
    secondNumber := req.SecondNumber
    sum := firstNumber + secondNumber
    res := &calculatorpb.SumResponse{
        SumResult: sum,
    }
    return res, nil
}

func main() {
    fmt.Println("Calculator Server")

    lis, err := net.Listen("tcp", "0.0.0.0:50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    s := grpc.NewServer()
    // 调用grpc,注册服务
    calculatorpb.RegisterCalculatorServiceServer(s, &server{})

    // 开启服务器
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
package main

import (
    "context"
    "fmt"
    "log"
    "rpc-learn/calculator/calculatorpb"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("Calculator Client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := calculatorpb.NewCalculatorServiceClient(conn)
    // fmt.Printf("Created client: %f", c)

    doUnary(c)
}

func doUnary(c calculatorpb.CalculatorServiceClient) {
    log.Println("Starting to do a Sum Unary RPC...")
    req := &calculatorpb.SumRequest{
        FirstNumber:  56,
        SecondNumber: 8,
    }

    res, err := c.Sum(context.Background(), req)
    if err != nil {
        log.Fatalf("error while calling Sum RPC: %v", err)
    }
    log.Printf("Response from Greet: %v", res.SumResult)
}


lesson20

流式传输 api

  • 适用于大型数据
  • 发送多个请求包
  • 适用于推送消息


lesson21

syntax = "proto3";

package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";

// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
    string first_name = 1;
    string last_name = 2;
}

// ...

message GreetManyTimesRequest {
    Greeting greeting = 1;
}

message GreetManyTimesResponse {
    string result = 1;
}

service GreetService{
    // Unary
    rpc Greet(GreetRequest) returns (GreetResponse) {};
    // Server Streaming
    rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse) {};
}

lesson22

package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "rpc-learn/greet/greetpb"
    "strconv"
    "time"

    "google.golang.org/grpc"
)

type server struct{}

func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
    // ...
}

func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
    log.Printf("GreetManyTimes function was invoked with %v\n", req)
    firstName := req.GetGreeting().GetFirstName()
    for i := 0; i < 10; i++ {
        result := "Hello" + firstName + " number" + strconv.Itoa(i)
        res := &greetpb.GreetManyTimesResponse{
            Result: result,
        }
        stream.Send(res)
        time.Sleep(1000 * time.Millisecond)
    }
    return nil
}

func main() {
    // ...
}

lesson23

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "rpc-learn/greet/greetpb"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := greetpb.NewGreetServiceClient(conn)

    // doUnary(c)

    doServerStreaming(c)
}

func doUnary(c greetpb.GreetServiceClient) {
    // ...
}

func doServerStreaming(c greetpb.GreetServiceClient) {
    log.Println("Starting to do a Server Streaming RPC...")

    req := &greetpb.GreetManyTimesRequest{
        Greeting: &greetpb.Greeting{
            FirstName: "Stephane",
            LastName:  "Maarek",
        },
    }

    resStream, err := c.GreetManyTimes(context.Background(), req)
    if err != nil {
        log.Fatalf("error while calling GreetManyTimes RPC: %v", err)
    }

    for {
        msg, err := resStream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("error while reading stream: %v", err)
        }
        log.Printf("Response from GreetManyTimes: %v", msg.GetResult())
    }
}

lesson24

因式分解

syntax = "proto3";

package calculator;
option go_package = "./calculator/calculatorpb";

//...

message PrimeNumberDecompositionRequest {
    int64 number = 1;
}

message PrimeNumberDecompositionResponse {
    int64 prime_factor = 1;
}

service CalculatorService {
    rpc Sum(SumRequest) returns (SumResponse) {};
    rpc PrimeNumberDecomposition(PrimeNumberDecompositionRequest)
        returns (stream PrimeNumberDecompositionResponse) {};
}
package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "rpc-learn/calculator/calculatorpb"

    "google.golang.org/grpc"
)

type server struct{}

func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
    // ...
}

func (*server) PrimeNumberDecomposition(
    req *calculatorpb.PrimeNumberDecompositionRequest,
    stream calculatorpb.CalculatorService_PrimeNumberDecompositionServer) error {
    log.Printf("Received PrimeNumberDecomposition RPC: %v", req)

    number := req.GetNumber()
    divisor := int64(2)

    for number > 1 {
        if number%divisor == 0 {
            stream.Send(&calculatorpb.PrimeNumberDecompositionResponse{
                PrimeFactor: divisor,
            })
            number = number / divisor
        } else {
            divisor++
            log.Printf("Divisor has increased to %v", divisor)
        }
    }
    return nil
}

func main() {
    // ...
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "rpc-learn/calculator/calculatorpb"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("Calculator Client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := calculatorpb.NewCalculatorServiceClient(conn)
    // fmt.Printf("Created client: %f", c)

    // doUnary(c)

    doServerStreaming(c)
}

func doUnary(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doServerStreaming(c calculatorpb.CalculatorServiceClient) {
    log.Println("Starting to do a PrimeNumberDecomposition RPC...")
    req := &calculatorpb.PrimeNumberDecompositionRequest{
        Number: 12,
    }

    stream, err := c.PrimeNumberDecomposition(context.Background(), req)
    if err != nil {
        log.Fatalf("error while calling PrimeNumberDecomposition RPC: %v", err)
    }
    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("error while reading stream: %v", err)
        }
        log.Println(res.GetPrimeFactor())
    }
}


lesson25

客户端流 api

  • 上传文件
  • 同时处理多个数据
  • 客户端向服务端推送


lesson26

syntax = "proto3";

package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";

// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
    string first_name = 1;
    string last_name = 2;
}

// ...

message LongGreetRequest {
    Greeting greeting = 1;
}

message LongGreetResponse {
    string result = 1;
}

service GreetService{
    // Unary
    rpc Greet(GreetRequest) returns (GreetResponse) {};
    // Server Streaming
    rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse) {};
    // Client Streaming
    rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {};
}

lesson27

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "rpc-learn/greet/greetpb"
    "strconv"
    "time"

    "google.golang.org/grpc"
)

type server struct{}

func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
    // ...
}

func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
    // ...
}

func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error {
    log.Println("LongGreet function was invoked with a streaming request")
    result := ""

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&greetpb.LongGreetResponse{
                Result: result,
            })
        }
        if err != nil {
            log.Fatalf("Error while reading client stream: %v", err)
        }
        firstName := req.GetGreeting().GetFirstName()
        result += "Hello " + firstName + "! "
    }
}

func main() {
    fmt.Println("Hello")

    lis, err := net.Listen("tcp", "0.0.0.0:50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    s := grpc.NewServer()
    // 调用grpc,注册服务
    greetpb.RegisterGreetServiceServer(s, &server{})

    // 开启服务器
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

lesson28

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "rpc-learn/greet/greetpb"
    "time"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := greetpb.NewGreetServiceClient(conn)

    doClientStreaming(c)
}

func doUnary(c greetpb.GreetServiceClient) {
    // ...
}

func doServerStreaming(c greetpb.GreetServiceClient) {
    // ...
}

func doClientStreaming(c greetpb.GreetServiceClient) {
    log.Println("Starting to do a Client Streaming RPC...")

    requests := []*greetpb.LongGreetRequest{
        &greetpb.LongGreetRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "Stephane",
            },
        },
        &greetpb.LongGreetRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "John",
            },
        },
        &greetpb.LongGreetRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "Lucy",
            },
        },
        &greetpb.LongGreetRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "Mark",
            },
        },
        &greetpb.LongGreetRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "Piper",
            },
        },
    }
    stream, err := c.LongGreet(context.Background())
    if err != nil {
        log.Fatalf("error while calling LongGreet: %v", err)
    }

    for _, req := range requests {
        log.Printf("Sending req: %v\n", req)
        stream.Send(req)
        time.Sleep(100 * time.Millisecond)
    }

    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("error while receiving response from LongGreet: %v", err)
    }
    log.Println("LongGreet Response:", res)
}

lesson29

syntax = "proto3";

package calculator;
option go_package = "./calculator/calculatorpb";

// ...

message ComputeAverageRequest {
    int32 number = 1;
}

message ComputeAverageResponse {
    double average = 1;
}

service CalculatorService {
    // ...
    rpc ComputeAverage(stream ComputeAverageRequest) returns (ComputeAverageResponse) {};
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "rpc-learn/calculator/calculatorpb"

    "google.golang.org/grpc"
)

type server struct{}

func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
    // ...
}

func (*server) PrimeNumberDecomposition(
    req *calculatorpb.PrimeNumberDecompositionRequest,
    stream calculatorpb.CalculatorService_PrimeNumberDecompositionServer) error {
    // ...
}

func (*server) ComputeAverage(
    stream calculatorpb.CalculatorService_ComputeAverageServer) error {
    log.Println("Received ComputeAverage RPC")

    sum := int32(0)
    count := 0

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            average := float64(sum) / float64(count)
            return stream.SendAndClose(&calculatorpb.ComputeAverageResponse{
                Average: average,
            })
        }
        if err != nil {
            log.Fatalf("Error while reading client stream: %v", err)
        }
        sum += req.GetNumber()
        count++
    }
}
func main() {
    // ...
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "rpc-learn/calculator/calculatorpb"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("Calculator Client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := calculatorpb.NewCalculatorServiceClient(conn)

    doClientStreaming(c)
}

func doUnary(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doServerStreaming(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doClientStreaming(c calculatorpb.CalculatorServiceClient) {
    log.Println("Starting to do a ComputeAverage Client Streaming RPC...")

    stream, err := c.ComputeAverage(context.Background())
    if err != nil {
        log.Fatalf("Error while opening stream: %v", err)
    }

    numbers := []int32{3, 5, 9, 54, 23}

    for _, number := range numbers {
        log.Printf("Sending number: %v\n", number)
        stream.Send(&calculatorpb.ComputeAverageRequest{
            Number: number,
        })
    }

    res, err := stream.CloseAndRecv()

    if err != nil {
        log.Fatalf("Error while receiving response: %v", err)
    }

    log.Println("The Average is:", res.GetAverage())
}

lesson30



lesson31

syntax = "proto3";

package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";

// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
    string first_name = 1;
    string last_name = 2;
}

// ...

message GreetEveryoneRequest {
    Greeting greeting = 1;
}

message GreetEveryoneResponse {
    string result = 1;
}

service GreetService{
    // Unary
    rpc Greet(GreetRequest) returns (GreetResponse) {};
    // Server Streaming
    rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse) {};
    // Client Streaming
    rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {};
    // BiDi Streaming
    rpc GreetEveryone(stream GreetEveryoneRequest) returns (stream GreetEveryoneResponse) {};
}

lesson32

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "rpc-learn/greet/greetpb"
    "strconv"
    "time"

    "google.golang.org/grpc"
)

type server struct{}

func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
    // ...
}

func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
    // ...
}

func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error {
    // ...
}

func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error {
    log.Println("GreetEveryone function was invoked with a streaming request")

    for {
        req, err := stream.Recv()
        if err == io.EOF {

            return nil
        }
        if err != nil {
            log.Fatalf("Error while reading client stream: %v", err)
            return err
        }
        firstName := req.Greeting.FirstName
        result := "Hello " + firstName + "! "
        err = stream.Send(&greetpb.GreetEveryoneResponse{
            Result: result,
        })
        if err != nil {
            log.Fatalf("Error while sending data to client: %v", err)
            return err
        }
    }
}

func main() {
    // ...
}

lesson33

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "rpc-learn/greet/greetpb"
    "time"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := greetpb.NewGreetServiceClient(conn)

    doBiDiStreaming(c)
}

func doUnary(c greetpb.GreetServiceClient) {
    // ...
}

func doServerStreaming(c greetpb.GreetServiceClient) {
    // ...
}

func doClientStreaming(c greetpb.GreetServiceClient) {
    // ...
}

func doBiDiStreaming(c greetpb.GreetServiceClient) {
    log.Println("Starting to do a BiDi Streaming RPC...")

    requests := []*greetpb.GreetEveryoneRequest{
        &greetpb.GreetEveryoneRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "Stephane",
            },
        },
        &greetpb.GreetEveryoneRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "John",
            },
        },
        &greetpb.GreetEveryoneRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "Lucy",
            },
        },
        &greetpb.GreetEveryoneRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "Mark",
            },
        },
        &greetpb.GreetEveryoneRequest{
            Greeting: &greetpb.Greeting{
                FirstName: "Piper",
            },
        },
    }

    // we create a stream by invoking the client
    stream, err := c.GreetEveryone(context.Background())
    if err != nil {
        log.Fatalf("error while creating stream:", err)
        return
    }

    watic := make(chan struct{})
    // we send a bunch of message to the client (go routine)
    go func() {
        // function to send a bunch of message
        for _, req := range requests {
            log.Println("Sending message:", req)
            stream.Send(req)
            time.Sleep(1000 * time.Millisecond)
        }
        stream.CloseSend()
    }()
    // we receive a bunch of messages from the client (go routine)
    go func() {
        // function to receive a bunch of message
        for {
            res, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatalf("error while receving: %v", err)
                break
            }
            log.Println("Received:", res.GetResult())
        }
        close(watic)
    }()

    // block until everything is done
    <-watic
}

lesson34

syntax = "proto3";

package calculator;
option go_package = "./calculator/calculatorpb";

// ...

message FindMaxiumRequest {
    int32 number = 1;
}

message FindMaxiumResponse {
    int32 maxium = 1;
}

service CalculatorService {
    // ...
    rpc FindMaxium(stream FindMaxiumRequest) returns (stream FindMaxiumResponse) {};
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "rpc-learn/calculator/calculatorpb"

    "google.golang.org/grpc"
)

type server struct{}

func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
    // ...
}

func (*server) PrimeNumberDecomposition(
    req *calculatorpb.PrimeNumberDecompositionRequest,
    stream calculatorpb.CalculatorService_PrimeNumberDecompositionServer) error {
    // ...
}

func (*server) ComputeAverage(
    stream calculatorpb.CalculatorService_ComputeAverageServer) error {
    // ...
}

func (*server) FindMaxium(stream calculatorpb.CalculatorService_FindMaxiumServer) error {
    log.Println("Received FindMaxium RPC")
    maxium := int32(0)

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            log.Fatalf("Error while reading client stream: %v", err)
            return err
        }
        number := req.GetNumber()
        if number > maxium {
            maxium = number
            err = stream.Send(&calculatorpb.FindMaxiumResponse{
                Maxium: maxium,
            })
            if err != nil {
                log.Fatalf("Error while sending client stream: %v", err)
                return err
            }
        }
    }
}

func main() {
    // ...
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "rpc-learn/calculator/calculatorpb"
    "time"

    "google.golang.org/grpc"
)

func main() {
    fmt.Println("Calculator Client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := calculatorpb.NewCalculatorServiceClient(conn)

    doBiDiStreaming(c)
}

func doUnary(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doServerStreaming(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doClientStreaming(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doBiDiStreaming(c calculatorpb.CalculatorServiceClient) {
    log.Println("Starting to do a FindMaxium BiDi Streaming RPC...")

    stream, err := c.FindMaxium(context.Background())

    if err != nil {
        log.Fatalf("Error while opening stream and calling FindMaxium: %v", err)
    }

    waitc := make(chan struct{})
    // send go routine
    go func() {
        numbers := []int32{4, 7, 2, 19, 4, 6, 32}
        for _, number := range numbers {
            log.Println("Sending number:", number)
            stream.Send(&calculatorpb.FindMaxiumRequest{
                Number: number,
            })
            time.Sleep(1000 * time.Millisecond)
        }
        stream.CloseSend()
    }()
    // receive go routine
    go func() {
        for {
            res, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatalf("Problem while reading server stream: %v", err)
                break
            }
            maxium := res.GetMaxium()
            log.Println("Received a new maxium of...:", maxium)
        }
        close(waitc)
    }()
    <-waitc
}

lesson35



lesson36 – Error

syntax = "proto3";

package calculator;
option go_package = "./calculator/calculatorpb";

// ...

message SquareRootRequest {
    int32 number = 1;
}

message SquareRootResponse {
    double number_root = 1;
}

service CalculatorService {
    // ...
    // error handling
    // this RPC will throw an exception if the send number is negative
    // The error being send it of type INVALID_ARGUMENT
    rpc SquareRoot(SquareRootRequest) returns (SquareRootResponse) {};
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "math"
    "net"
    "rpc-learn/calculator/calculatorpb"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type server struct{}

func (*server) Sum(ctx context.Context, req *calculatorpb.SumRequest) (*calculatorpb.SumResponse, error) {
    // ...
}

func (*server) PrimeNumberDecomposition(
    req *calculatorpb.PrimeNumberDecompositionRequest,
    stream calculatorpb.CalculatorService_PrimeNumberDecompositionServer) error {
    // ...
}

func (*server) ComputeAverage(
    stream calculatorpb.CalculatorService_ComputeAverageServer) error {
    //...
}

func (*server) FindMaxium(stream calculatorpb.CalculatorService_FindMaxiumServer) error {
    // ...
}

func (*server) SquareRoot(
    ctx context.Context, req *calculatorpb.SquareRootRequest) (
    *calculatorpb.SquareRootResponse, error) {
    log.Println("Received SquareRoot RPC")
    number := req.GetNumber()
    if number < 0 {
        return nil, status.Errorf(
            codes.InvalidArgument,
            fmt.Sprintf("Received a negative number: %v", number),
        )
    }
    return &calculatorpb.SquareRootResponse{
        NumberRoot: math.Sqrt(float64(number)),
    }, nil
}

func main() {
    // ...
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "rpc-learn/calculator/calculatorpb"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func main() {
    fmt.Println("Calculator Client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := calculatorpb.NewCalculatorServiceClient(conn)

    doErrorUnary(c)
}

func doUnary(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doServerStreaming(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doClientStreaming(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doBiDiStreaming(c calculatorpb.CalculatorServiceClient) {
    // ...
}

func doErrorUnary(c calculatorpb.CalculatorServiceClient) {
    log.Println("Starting to be a SquareRoot Unary RPC...")

    doErrorCall(c, 10)
    doErrorCall(c, -2)
}

func doErrorCall(c calculatorpb.CalculatorServiceClient, n int32) {
    res, err := c.SquareRoot(context.Background(), &calculatorpb.SquareRootRequest{Number: n})
    if err != nil {
        respErr, ok := status.FromError(err)
        if ok {
            // actual error from grpc (user error)
            log.Println("Error message from server:", respErr.Message())
            log.Println(respErr.Code())
            if respErr.Code() == codes.InvalidArgument {
                log.Println("We probably sent a negative number!")
                return
            }
        } else {
            log.Fatalf("Big Error calling SquareRoot: %v", err)
            return
        }
    }
    log.Println("Result of square root of %v: %v", n, res.GetNumberRoot())
}

lesson37 – Deadline


lesson38

syntax = "proto3";

package greet;
// 指定输出目录,是相对于运行protoc编译命令的目录
option go_package="./greet/greetpb";

// api间交换的数据结构
// 1,2等数字是数据的填充顺序
message Greeting {
    string first_name = 1;
    string last_name = 2;
}

// ...

message GreetWithDeadlineRequest {
    Greeting greeting = 1;
}

message GreetWithDeadlineResponse {
    string result = 1;
}

service GreetService{
    // ...
    // Unary With Deadline
    rpc GreetWithDeadline(GreetWithDeadlineRequest) returns (GreetWithDeadlineResponse) {};
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "rpc-learn/greet/greetpb"
    "strconv"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type server struct{}

func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
    // ...
}

func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
    // ...
}

func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error {
    // ...
}

func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error {
    // ...
}

func (*server) GreetWithDeadline(
    ctx context.Context, req *greetpb.GreetWithDeadlineRequest) (
    *greetpb.GreetWithDeadlineResponse, error) {
    log.Printf("GreetWithDeadline function was invoked with %v\n", req)
    for i := 0; i < 3; i++ {
        if ctx.Err() == context.Canceled {
            // the client canceled the request
            log.Println("The client canceled the request!")
            return nil, status.Error(codes.DeadlineExceeded, "the client canceled the request")
        }
        time.Sleep(1 * time.Second)
    }
    firstName := req.GetGreeting().GetFirstName()
    result := "Hello " + firstName
    res := &greetpb.GreetWithDeadlineResponse{
        Result: result,
    }
    return res, nil
}

func main() {
    // ...
}
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "rpc-learn/greet/greetpb"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func main() {
    fmt.Println("client")
    // WithInsecure: 不安全连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }

    defer conn.Close()

    c := greetpb.NewGreetServiceClient(conn)

    doUnaryWithDetail(c, 5*time.Second)
    doUnaryWithDetail(c, 1*time.Second)
}

func doUnary(c greetpb.GreetServiceClient) {
    // ...
}

func doServerStreaming(c greetpb.GreetServiceClient) {
    // ...
}

func doClientStreaming(c greetpb.GreetServiceClient) {
    // ...
}

func doBiDiStreaming(c greetpb.GreetServiceClient) {
    // ...
}

func doUnaryWithDetail(c greetpb.GreetServiceClient, timeout time.Duration) {
    log.Println("Starting to do a UnaryWithDetail RPC...")
    req := &greetpb.GreetWithDeadlineRequest{
        Greeting: &greetpb.Greeting{
            FirstName: "Stephane",
            LastName:  "Maarek",
        },
    }

    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel() // 超时,退出

    res, err := c.GreetWithDeadline(ctx, req)
    if err != nil {
        statusErr, ok := status.FromError(err)
        if ok {
            if statusErr.Code() == codes.DeadlineExceeded {
                log.Println("Timeout was hit! Deadline was exceeded")
            } else {
                log.Println("error while calling GreetWithDeadline RPC:", err)
            }
        } else {
            log.Fatalf("error while calling GreetWithDetail RPC: %v", err)
        }
        return
    }
    log.Printf("Response from GreetWithDetail: %v", res.Result)
}

lesson39 – SSL






lesson40


#!/bin/bash
SERVER_CN=localhost

openssl genrsa -passout pass:1111 -des3 -out ca.key 4096
openssl req -passin pass:1111 -new -x509 -days 365 -key ca.key -out ca.crt -subj "/CN=${SERVER_CN}"

openssl genrsa -passout pass:1111 -des3 -out server.key 4096

openssl req -passin pass:1111 -new -key server.key -out server.csr -subj "/CN=${SERVER_CN}"

openssl x509 -req -passin pass:1111 -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

openssl pkcs8 -topk8 -nocrypt -passin pass:1111 -in server.key -out server.pem

用不了,跳

lesson41

java 的 grpc 出问题,不搞了

lesson42–reflection&cli


import    "google.golang.org/grpc/reflection"


evans -p 50051 -r
show package
show service
show message
# 展示message详细信息
desc SumRequest
# 切换到包下
package calculator
# 切换到服务
service CalculatorService
# 调用服务
call Sum






lesson43–grpc with MongoDB


lesson44

lesson45

syntax = "proto3";

package blog;

option go_package = "./blog/blogpb";

message Blog {
    string id = 1;
    string author_id = 2;
    string title = 3;
    string content = 4;
}

service BlogService {

}
package main

import (
    "fmt"
    "os"
    "os/signal"
    "rpc-learn/blog/blogpb"
    "log"
    "net"
    "google.golang.org/grpc"
)

type server struct{}

func main() {
    // if we crash the go code, we get the file name and line number
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    fmt.Println("Blog server started")

    lis, err := net.Listen("tcp", "localhost:50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    opts := []grpc.ServerOption{}
    s := grpc.NewServer(opts...)
    // 调用grpc,注册服务
    blogpb.RegisterBlogServiceServer(s, &server{})

    go func() {
        // 开启服务器
        if err := s.Serve(lis); err != nil {
            log.Fatalf("failed to serve: %v", err)
        }
    }()

    // wait for ctrl-c to exit
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)

    // block until a signal is received
    <-ch
    log.Println("Stopping the server")
    s.Stop()
    log.Println("Stopping the listener")
    lis.Close()
    log.Println("End of Program")
}

lesson46

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "rpc-learn/blog/blogpb"

    // "io"
    "log"
    "net"

    // "strconv"
    // "time"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "google.golang.org/grpc"
    // "google.golang.org/grpc/codes"
    // "google.golang.org/grpc/status"
)

type server struct{}

type blogItem struct {
    ID       string `bson:"_id,omitempty"`
    AuthorID string `bson:"author_id`
    Content  string `bson:"content"`
    Title    string `bson:"title"`
}

func main() {
    // if we crash the go code, we get the file name and line number
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    fmt.Println("Blog server started")

    serverAPI := options.ServerAPI(options.ServerAPIVersion1)
    mongoOpts := options.Client().ApplyURI("mongodb://localhost:27017").SetServerAPIOptions(serverAPI)
    client, err := mongo.Connect(context.TODO(), mongoOpts)
    if err != nil {
        {
            panic(err)
        }
    }
    // defer func() {
    //     if err = client.Disconnect(context.TODO()); err != nil {
    //         panic(err)
    //     }
    // }()
    var result bson.M
    if err := client.Database("admin").RunCommand(context.TODO(), bson.D{{"ping", 1}}).Decode(&result); err != nil {
        panic(err)
    }
    fmt.Println("Pinged your deployment. You successfully connected to MongoDB!")

    lis, err := net.Listen("tcp", "localhost:50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    opts := []grpc.ServerOption{}
    s := grpc.NewServer(opts...)
    // 调用grpc,注册服务
    blogpb.RegisterBlogServiceServer(s, &server{})

    go func() {
        // 开启服务器
        if err := s.Serve(lis); err != nil {
            log.Fatalf("failed to serve: %v", err)
        }
    }()

    // wait for ctrl-c to exit
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)

    // block until a signal is received
    <-ch
    log.Println("Stopping the server")
    s.Stop()
    log.Println("Stopping the listener")
    lis.Close()
    log.Println("Closing MongoDB Connection")
    client.Disconnect(context.TODO())
    log.Println("End of Program")
}

lesson47

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "rpc-learn/blog/blogpb"

    // "io"
    "log"
    "net"

    // "strconv"
    // "time"
    "go.mongodb.org/mongo-driver/bson/primitive"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type blogItem struct {
    ID       primitive.ObjectID `bson:"_id,omitempty"`
    AuthorID string             `bson:"author_id`
    Content  string             `bson:"content"`
    Title    string             `bson:"title"`
}

var coll *mongo.Collection

type server struct{}

func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
    blog := req.GetBlog()

    data := blogItem{
        AuthorID: blog.GetAuthorId(),
        Title:    blog.GetTitle(),
        Content:  blog.GetContent(),
    }

    res, err := coll.InsertOne(context.Background(), data)
    if err != nil {
        return nil, status.Errorf(codes.Internal,
            fmt.Sprintf("Internal error: %v", err))
    }
    oid,ok:= res.InsertedID.(primitive.ObjectID)
    if !ok{
        return nil , status.Errorf(codes.Internal,
            fmt.Sprintf("Cannot convert to OID"))
    }
    return &blogpb.CreateBlogResponse{
        Blog: &blogpb.Blog{
            Id:      oid.Hex(),
            AuthorId: blog.GetAuthorId(),
            Title:    blog.GetTitle(),
            Content:  blog.GetContent(),
        },
    }, nil
}

func main() {
    // if we crash the go code, we get the file name and line number
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    fmt.Println("Blog server started")

    serverAPI := options.ServerAPI(options.ServerAPIVersion1)
    mongoOpts := options.Client().ApplyURI("mongodb://localhost:27017").SetServerAPIOptions(serverAPI)
    client, err := mongo.Connect(context.TODO(), mongoOpts)
    if err != nil {
        {
            panic(err)
        }
    }
    // defer func() {
    //     if err = client.Disconnect(context.TODO()); err != nil {
    //         panic(err)
    //     }
    // }()
    coll = client.Database("sample_mflix").Collection("blogs")

    fmt.Println("Pinged your deployment. You successfully connected to MongoDB!")

    lis, err := net.Listen("tcp", "localhost:50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    opts := []grpc.ServerOption{}
    s := grpc.NewServer(opts...)
    // 调用grpc,注册服务
    blogpb.RegisterBlogServiceServer(s, &server{})

    go func() {
        // 开启服务器
        if err := s.Serve(lis); err != nil {
            log.Fatalf("failed to serve: %v", err)
        }
    }()

    // wait for ctrl-c to exit
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)

    // block until a signal is received
    <-ch
    log.Println("Stopping the server")
    s.Stop()
    log.Println("Stopping the listener")
    lis.Close()
    log.Println("Closing MongoDB Connection")
    client.Disconnect(context.TODO())
    log.Println("End of Program")
}

lesson48

package main

import (
    "context"
    "fmt"
    // "io"
    "log"
    "rpc-learn/blog/blogpb"
    // "time"

    "google.golang.org/grpc"
    // "google.golang.org/grpc/codes"
    // "google.golang.org/grpc/credentials"
    // "google.golang.org/grpc/status"
)

func main() {
    fmt.Println("blog client")

    opts := grpc.WithInsecure()
    conn, err := grpc.Dial("localhost:50051", opts)
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }
    defer conn.Close()

    c := blogpb.NewBlogServiceClient(conn)

    log.Println("Creating the blog")
    blog := &blogpb.Blog{
        AuthorId: "Stephane",
        Title:    "My First Blog",
        Content:  "Content of the first blog",
    }
    res, err := c.CreateBlog(context.Background(), &blogpb.CreateBlogRequest{
        Blog: blog,
    })
    if err != nil {
        log.Fatalf("Unexpected error creating blog: %v", err)
    }
    log.Println("Blog has been created:", res)
}

lesson49

syntax = "proto3";

package blog;

option go_package = "./blog/blogpb";

message Blog {
    string id = 1;
    string author_id = 2;
    string title = 3;
    string content = 4;
}

// ...

message ReadBlogRequest {
    string blog_id = 1;
}

message ReadBlogResponse {
    Blog blog = 1; // will have a blog id
}

service BlogService {
    rpc CreateBlog (CreateBlogRequest) returns (CreateBlogResponse);
    // return NOT FOUND if not found
    rpc ReadBlog (ReadBlogRequest) returns (ReadBlogResponse);
}
package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "rpc-learn/blog/blogpb"

    // "io"
    "log"
    "net"

    // "strconv"
    // "time"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/bson/primitive"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type blogItem struct {
    ID       primitive.ObjectID `bson:"_id,omitempty"`
    AuthorID string             `bson:"author_id`
    Content  string             `bson:"content"`
    Title    string             `bson:"title"`
}

var coll *mongo.Collection

type server struct{}

func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
    // ...
}

func (*server) ReadBlog(ctx context.Context, req *blogpb.ReadBlogRequest) (*blogpb.ReadBlogResponse, error) {
    log.Println("Read blog request")

    blogId := req.GetBlogId()

    oid, err := primitive.ObjectIDFromHex(blogId)
    if err != nil {
        return nil, status.Errorf(
            codes.InvalidArgument,
            fmt.Sprintf("Cannot parse ID"))
    }

    // create a empty struct
    data := &blogItem{}
    filter := bson.D{{"_id", oid}}

    res := coll.FindOne(context.Background(), filter)
    if err := res.Decode(data); err != nil {
        return nil, status.Errorf(
            codes.NotFound,
            fmt.Sprintf("Cannot find blog with specified ID %v", err))
    }

    return &blogpb.ReadBlogResponse{
        Blog: &blogpb.Blog{
            Id:       data.ID.Hex(),
            AuthorId: data.AuthorID,
            Content:  data.Content,
            Title:    data.Title,
        },
    }, nil
}

func main() {
    // ...
}

lesson50

package main

import (
    "context"
    "fmt"
    "log"
    "rpc-learn/blog/blogpb"
    "google.golang.org/grpc"
)

func main() {
    fmt.Println("blog client")

    opts := grpc.WithInsecure()
    conn, err := grpc.Dial("localhost:50051", opts)
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }
    defer conn.Close()

    c := blogpb.NewBlogServiceClient(conn)

    // create
    // blog := &blogpb.Blog{
    //     AuthorId: "Stephane",
    //     Title:    "My First Blog",
    //     Content:  "Content of the first blog",
    // }
    // create(c, blog)

    // find
    findOne(c, "64a5f45f94466f3e02f74f79")
}

func create(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
    log.Println("Creating the blog")
    res, err := c.CreateBlog(context.Background(), &blogpb.CreateBlogRequest{
        Blog: blog,
    })
    if err != nil {
        log.Fatalf("Unexpected error creating blog: %v", err)
    }
    log.Println("Blog has been created:", res)
}

func findOne(c blogpb.BlogServiceClient, id string) {
    log.Println("Reading the blog")

    res, err := c.ReadBlog(context.Background(), &blogpb.ReadBlogRequest{
        BlogId: id,
    })
    if err != nil {
        log.Fatalf("Unexpected error reading blog: %v", err)
    }
    log.Println("Blog has been read:", res)
}

lesson51

syntax = "proto3";

package blog;

option go_package = "./blog/blogpb";

message Blog {
    string id = 1;
    string author_id = 2;
    string title = 3;
    string content = 4;
}

// ...

message UpdateBlogRequest {
    Blog blog = 1;
}

message UpdateBlogResponse {
    Blog blog = 1;
}

service BlogService {
    rpc CreateBlog (CreateBlogRequest) returns (CreateBlogResponse);
    // return NOT FOUND if not found
    rpc ReadBlog (ReadBlogRequest) returns (ReadBlogResponse);
    rpc UpdateBlog (UpdateBlogRequest) returns (UpdateBlogResponse);
}
package main

import (
    // ...
)

type blogItem struct {
    // ...
}

var coll *mongo.Collection

type server struct{}

func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
    // ...
}

func (*server) ReadBlog(ctx context.Context, req *blogpb.ReadBlogRequest) (*blogpb.ReadBlogResponse, error) {
    // ...
}

func (*server) UpdateBlog(
    ctx context.Context, req *blogpb.UpdateBlogRequest,
) (
    *blogpb.UpdateBlogResponse, error,
) {
    log.Println("Updating blog request")
    blog := req.GetBlog()

    oid, err := primitive.ObjectIDFromHex(blog.GetId())
    if err != nil {
        return nil, status.Errorf(
            codes.InvalidArgument,
            fmt.Sprintf("Cannot parse ID"))
    }

    // create a empty struct
    data := &blogItem{}
    filter := bson.D{{"_id", oid}}

    res := coll.FindOne(context.Background(), filter)
    if err := res.Decode(data); err != nil {
        return nil, status.Errorf(
            codes.NotFound,
            fmt.Sprintf("Cannot find blog with specified ID %v", err))
    }

    data.AuthorID = blog.GetAuthorId()
    data.Content = blog.GetContent()
    data.Title = blog.GetTitle()

    _, updateErr := coll.ReplaceOne(context.Background(), filter, data)
    if updateErr != nil {
        return nil, status.Errorf(
            codes.Internal,
            fmt.Sprintf("Cannot update object:%v", updateErr),
        )
    }

    return &blogpb.UpdateBlogResponse{
        Blog: dataToBlogpb(data),
    }, nil
}

func dataToBlogpb(data *blogItem) *blogpb.Blog {
    return &blogpb.Blog{
        Id:       data.ID.Hex(),
        AuthorId: data.AuthorID,
        Content:  data.Content,
        Title:    data.Title,
    }
}

func main() {
    // ...
}

lesson52

package main

import (
    // ...
)

func main() {
    fmt.Println("blog client")

    opts := grpc.WithInsecure()
    conn, err := grpc.Dial("localhost:50051", opts)
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }
    defer conn.Close()

    c := blogpb.NewBlogServiceClient(conn)

    // update
    blog := &blogpb.Blog{
        Id:       "64a5f45f94466f3e02f74f79",
        AuthorId: "changer",
        Title:    "My change Blog",
        Content:  "Content of the change blog",
    }
    update(c, blog)
}

func create(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
    // ...
}

func findOne(c blogpb.BlogServiceClient, id string) {
    // ...
}

func update(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
    log.Println("Updating the blog")
    uptRes, uptErr := c.UpdateBlog(context.Background(), &blogpb.UpdateBlogRequest{
        Blog: blog,
    })
    if uptErr != nil {
        log.Fatalf("Error while updating: %v", uptErr)
    }
    log.Println("Blog was read:", uptRes)
}

lesson53

syntax = "proto3";

package blog;

option go_package = "./blog/blogpb";

message Blog {
    string id = 1;
    string author_id = 2;
    string title = 3;
    string content = 4;
}

// ...

message DeleteBlogRequest {
    string blog_id = 1;
}

message DeleteBlogResponse {
    string blog_id = 1;
}

service BlogService {
    rpc CreateBlog (CreateBlogRequest) returns (CreateBlogResponse);
    // return NOT FOUND if not found
    rpc ReadBlog (ReadBlogRequest) returns (ReadBlogResponse);
    rpc UpdateBlog (UpdateBlogRequest) returns (UpdateBlogResponse);
    rpc DeleteBlog (DeleteBlogRequest) returns (DeleteBlogResponse);
}
package main

import (
    // ...
)

type blogItem struct {
    // ...
}

var coll *mongo.Collection

type server struct{}

func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
    // ...
}

func (*server) ReadBlog(ctx context.Context, req *blogpb.ReadBlogRequest) (*blogpb.ReadBlogResponse, error) {
    // ...
}

func (*server) UpdateBlog(
    ctx context.Context, req *blogpb.UpdateBlogRequest,
) (
    *blogpb.UpdateBlogResponse, error,
) {
    // ...
}

func (*server) DeleteBlog(
    ctx context.Context, req *blogpb.DeleteBlogRequest,
) (
    *blogpb.DeleteBlogResponse, error,
) {
    log.Println("Deleting blog request")
    oid, err := primitive.ObjectIDFromHex(req.GetBlogId())
    if err != nil {
        return nil, status.Errorf(
            codes.InvalidArgument,
            fmt.Sprintf("Cannot parse ID"),
        )
    }

    filter := bson.D{{"_id", oid}}
    res, err := coll.DeleteOne(context.Background(), filter)
    if err != nil {
        return nil, status.Errorf(
            codes.Internal,
            fmt.Sprintf("Cannot delete object: %v", err),
        )
    }

    if res.DeletedCount == 0 {
        return nil, status.Errorf(
            codes.Internal,
            fmt.Sprintf("Cannot find object: %v", err),
        )
    }

    return &blogpb.DeleteBlogResponse{BlogId: req.GetBlogId()}, nil
}

func dataToBlogpb(data *blogItem) *blogpb.Blog {
    // ...
}

func main() {
    // ...
}

lesson54

package main

import (
    // ...
)

func main() {
    fmt.Println("blog client")

    opts := grpc.WithInsecure()
    conn, err := grpc.Dial("localhost:50051", opts)
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }
    defer conn.Close()

    c := blogpb.NewBlogServiceClient(conn)

    delete(c, "64a627df7303d9f27d4f19a8")
}

func create(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
    // ...
}

func findOne(c blogpb.BlogServiceClient, id string) {
    // ...
}

func update(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
    // ...
}

func delete(c blogpb.BlogServiceClient, id string) {
    log.Println("Deleting the blog")
    delRes, delErr := c.DeleteBlog(context.Background(), &blogpb.DeleteBlogRequest{
        BlogId: id,
    })
    if delErr != nil {
        log.Fatalf("Error while deleting: %v", delErr)
    }
    log.Println("Blog was read:", delRes)
}

lesson55

syntax = "proto3";

package blog;

option go_package = "./blog/blogpb";

message Blog {
    string id = 1;
    string author_id = 2;
    string title = 3;
    string content = 4;
}

// ...

message ListBlogRequest {

}

message ListBlogResponse {
    Blog blog = 1;
}

service BlogService {
    rpc CreateBlog (CreateBlogRequest) returns (CreateBlogResponse);
    // return NOT FOUND if not found
    rpc ReadBlog (ReadBlogRequest) returns (ReadBlogResponse);
    rpc UpdateBlog (UpdateBlogRequest) returns (UpdateBlogResponse);
    rpc DeleteBlog (DeleteBlogRequest) returns (DeleteBlogResponse);
    rpc ListBlog (ListBlogRequest) returns (stream ListBlogResponse);
}
package main

import (
    // ...
)

type blogItem struct {
    // ...
}

var coll *mongo.Collection

type server struct{}

func (*server) CreateBlog(ctx context.Context, req *blogpb.CreateBlogRequest) (*blogpb.CreateBlogResponse, error) {
    // ...
}

func (*server) ReadBlog(ctx context.Context, req *blogpb.ReadBlogRequest) (*blogpb.ReadBlogResponse, error) {
    // ...
}

func (*server) UpdateBlog(
    ctx context.Context, req *blogpb.UpdateBlogRequest,
) (
    *blogpb.UpdateBlogResponse, error,
) {
    // ...
}

func (*server) DeleteBlog(
    ctx context.Context, req *blogpb.DeleteBlogRequest,
) (
    *blogpb.DeleteBlogResponse, error,
) {
    // ...
}

func (*server) ListBlog(
    req *blogpb.ListBlogRequest, stream blogpb.BlogService_ListBlogServer,
) error {
    log.Println("List blog request")

    // 查询所有要给一个空的filter
    cur, err := coll.Find(context.Background(), bson.D{})
    if err != nil {
        return status.Errorf(
            codes.Internal,
            fmt.Sprintf("Unknown internal error: %v", err),
        )
    }
    defer cur.Close(context.Background())
    for cur.Next(context.Background()) {
        data := &blogItem{}
        err := cur.Decode(data)
        if err != nil {
            return status.Errorf(
                codes.Internal,
                fmt.Sprintf("Error while decoding data: %v", err),
            )
        }
        stream.Send(&blogpb.ListBlogResponse{
            Blog: dataToBlogpb(data),
        })
    }
    if err := cur.Err(); err != nil {
        return status.Errorf(
            codes.Internal,
            fmt.Sprintf("Unknown internal error: %v", err),
        )
    }
    return nil
}

func dataToBlogpb(data *blogItem) *blogpb.Blog {
    // ...
}

func main() {
    // ...
}

lesson56

package main

import (
    // ...
)

func main() {
    fmt.Println("blog client")

    opts := grpc.WithInsecure()
    conn, err := grpc.Dial("localhost:50051", opts)
    if err != nil {
        log.Fatalf("could not connect: %v", err)
    }
    defer conn.Close()

    c := blogpb.NewBlogServiceClient(conn)

    // list
    list(c)
}

func create(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
    // ...
}

func findOne(c blogpb.BlogServiceClient, id string) {
    // ...
}

func update(c blogpb.BlogServiceClient, blog *blogpb.Blog) {
    // ...
}

func delete(c blogpb.BlogServiceClient, id string) {
    // ...
}

func list(c blogpb.BlogServiceClient) {
    stream, err := c.ListBlog(context.Background(), &blogpb.ListBlogRequest{})
    if err != nil {
        log.Fatalf("error while calling ListBlog RPC: %v", err)
    }
    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("error while reading ListBlog RPC: %v", err)
        }
        log.Println(res.GetBlog())
    }
}

lesson57


evans -p 50051 -r

lesson58






 上一篇
Rust 编程:完整的开发者指南 Rust 编程:完整的开发者指南
视频链接哔哩哔哩 代码 11-1 intro 1-2 data types 1-3 variables 变量是将数据分配到临时内存位置的一种方式 变量默认不可变,使用 mut 关键字可以使之可变 1-4 functions 1-5
2023-07-04
下一篇 
Software Architecture & Technology of Large-Scale Systems Software Architecture & Technology of Large-Scale Systems
1. Introduction 2. Performance1. Module contents overview 2. A reference software system for discussing performance 3. W
2023-07-01
  目录