Golang分布式注册中心实现流程讲解(golang分布式计算框架)学会了吗

随心笔谈12个月前发布 admin
92 0

package registry
import (
“bytes”
“encoding/json”
“fmt”
“io/ioutil”
“log”
“net/http”
“sync”
“time”
)
const ServerPort=”:3000″
const ServicesURL=”http://localhost” + ServerPort + “/services” // 注册中心地址
// 服务对象集合
type registry struct {
registrations []Registration
mutex *sync.RWMutex
}
// 添加服务
func (r *registry) add(reg Registration) error {
r.mutex.Lock()
r.registrations=append(r.registrations, reg)
r.mutex.Unlock()
err :=r.sendRequiredServices(reg)
r.notify(patch{
Added: []patchEntry{
patchEntry{
Name: reg.ServiceName,
URL: reg.ServiceURL,
},
},
})
return err
}
// 通知服务接口请求去刷新改变后到服务
func (r registry) notify(fullPatch patch) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, reg :=range r.registrations {
go func(reg Registration) {
for _, reqService :=range reg.RequiredServices {
p :=patch{Added: []patchEntry{}, Removed: []patchEntry{}}
sendUpdate :=false
for _, added :=range fullPatch.Added {
if added.Name==reqService {
p.Added=append(p.Added, added)
sendUpdate=true
}
}
for _, removed :=range fullPatch.Removed {
if removed.Name==reqService {
p.Removed=append(p.Removed, removed)
sendUpdate=true
}
}
if sendUpdate {
err :=r.sendPatch(p, reg.ServiceUpdateURL)
if err !=nil {
log.Println(err)
return
}
}
}
}(reg)
}
}
// 更新每个服务的依赖服务
func (r registry) sendRequiredServices(reg Registration) error {
r.mutex.RLock()
defer r.mutex.RUnlock()
var p patch
for _, serviceReg :=range r.registrations {
for _, reqService :=range reg.RequiredServices {
if serviceReg.ServiceName==reqService {
p.Added=append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
err :=r.sendPatch(p, reg.ServiceUpdateURL)
if err !=nil {
return err
}
return nil
}
// 告诉客户端更新,最新的服务列表是这个
func (r registry) sendPatch(p patch, url string) error {
d, err :=json.Marshal(p)
if err !=nil {
return err
}
_, err=http.Post(url, “application/json”, bytes.NewBuffer(d))
if err !=nil {
return err
}
return nil
}
// 注册中心删除服务对象
func (r *registry) remove(url string) error {
for i :=range reg.registrations {
if reg.registrations[i].ServiceURL==url {
// 通知客户端更新对象信息
r.notify(patch{
Removed: []patchEntry{
{
Name: r.registrations[i].ServiceName,
URL: r.registrations[i].ServiceURL,
},
},
})
r.mutex.Lock()
reg.registrations=append(reg.registrations[:i], reg.registrations[i+1:]…)
r.mutex.Unlock()
return nil
}
}
return fmt.Errorf(“Service at URL %s not found”, url)
}
// 心跳检测
func (r *registry) heartbeat(freq time.Duration) {
for {
var wg sync.WaitGroup
for _, reg :=range r.registrations {
wg.Add(1)
go func(reg Registration) {
defer wg.Done()
success :=true
for attemps :=0; attemps < 3; attemps++ {
res, err :=http.Get(reg.HeartbeatURL)
if err !=nil {
log.Println(err)
} else if res.StatusCode==http.StatusOK {
log.Printf(“Heartbeat check passed for %v”, reg.ServiceName)
// 如果心跳恢复了,把服务重新注册回来
if !success {
r.add(reg)
}
break;
}
// 如果执行到这就代表着心跳没有响应,那就代表着需要回收注销该服务了
log.Printf(“Heartbeat check failed for %v”, reg.ServiceName)
if success {
success=false
r.remove(reg.ServiceURL)
}
time.Sleep(1 * time.Second)
}
}(reg)
wg.Wait()
time.Sleep(freq)
}
}
}
var once sync.Once
func SetupRegistryService() {
// 保证执行一次进行服务到心跳 每三秒循环一遍
once.Do(func() {
go reg.heartbeat(3 * time.Second)
})
}
var reg=registry{
registrations: make([]Registration, 0),
mutex: new(sync.RWMutex),
}
type RegistryService struct{}
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println(“Request received”)
switch r.Method {
case http.MethodPost:
dec :=json.NewDecoder(r.Body)
var r Registration
err :=dec.Decode(&r)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
log.Printf(“Adding service: %v with URL: %s\n”, r.ServiceName,
r.ServiceURL)
err=reg.add(r)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
case http.MethodDelete:
payload, err :=ioutil.ReadAll(r.Body)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url :=string(payload)
log.Printf(“Removing service at URL: %s”, url)
err=reg.remove(url)
if err !=nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

© 版权声明

相关文章