Update redis schema.

This commit is contained in:
Naoki Kosaka 2018-11-07 20:41:11 +09:00
parent 54e6a11010
commit 5cc8acb500
2 changed files with 37 additions and 28 deletions

View File

@ -66,10 +66,10 @@ func contains(entries interface{}, finder string) bool {
} }
func pushRelayJob(sourceInbox string, body []byte) { func pushRelayJob(sourceInbox string, body []byte) {
receivers, _ := RedClient.Keys("subscription:*").Result() domains, _ := redClient.Keys("relay:subscription:*").Result()
for _, receiver := range receivers { for _, domain := range domains {
if sourceInbox != strings.Replace(receiver, "subscription:", "", 1) { if sourceInbox != strings.Replace(domain, "relay:subscription:", "", 1) {
inboxURL, _ := RedClient.HGet(receiver, "inbox_url").Result() inboxURL, _ := redClient.HGet(domain, "inbox_url").Result()
job := &tasks.Signature{ job := &tasks.Signature{
Name: "relay", Name: "relay",
RetryCount: 0, RetryCount: 0,
@ -86,7 +86,7 @@ func pushRelayJob(sourceInbox string, body []byte) {
}, },
}, },
} }
_, err := MacServer.SendTask(job) _, err := macServer.SendTask(job)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
@ -111,7 +111,7 @@ func pushRegistorJob(inboxURL string, body []byte) {
}, },
}, },
} }
_, err := MacServer.SendTask(job) _, err := macServer.SendTask(job)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
@ -127,8 +127,8 @@ func followAcceptable(activity *activitypub.Activity, actor *activitypub.Actor)
func suitableFollow(activity *activitypub.Activity, actor *activitypub.Actor) bool { func suitableFollow(activity *activitypub.Activity, actor *activitypub.Actor) bool {
domain, _ := url.Parse(activity.Actor) domain, _ := url.Parse(activity.Actor)
receivers, _ := RedClient.Exists("blocked_domain:" + domain.Host).Result() blocked, _ := redClient.HExists("relay:config:blockedDomain", domain.Host).Result()
if receivers != 0 { if blocked {
return false return false
} }
return true return true
@ -139,8 +139,8 @@ func relayAcceptable(activity *activitypub.Activity, actor *activitypub.Actor) e
return errors.New("Activity should contain https://www.w3.org/ns/activitystreams#Public as receiver") return errors.New("Activity should contain https://www.w3.org/ns/activitystreams#Public as receiver")
} }
domain, _ := url.Parse(activity.Actor) domain, _ := url.Parse(activity.Actor)
receivers, _ := RedClient.Exists("subscription:" + domain.Host).Result() exist, _ := redClient.Exists("relay:subscription:" + domain.Host).Result()
if receivers == 0 { if exist == 0 {
return errors.New("To use the relay service, Subscribe me in advance") return errors.New("To use the relay service, Subscribe me in advance")
} }
return nil return nil
@ -148,8 +148,8 @@ func relayAcceptable(activity *activitypub.Activity, actor *activitypub.Actor) e
func suitableRelay(activity *activitypub.Activity, actor *activitypub.Actor) bool { func suitableRelay(activity *activitypub.Activity, actor *activitypub.Actor) bool {
domain, _ := url.Parse(activity.Actor) domain, _ := url.Parse(activity.Actor)
receivers, _ := RedClient.Exists("limited_domain:" + domain.Host).Result() limited, _ := redClient.HExists("relay:config:limitedDomain", domain.Host).Result()
if receivers != 0 { if limited {
return false return false
} }
if relConfig.blockService && actor.Type == "Service" { if relConfig.blockService && actor.Type == "Service" {
@ -177,11 +177,11 @@ func handleInbox(w http.ResponseWriter, r *http.Request) {
var responseType string var responseType string
if suitableFollow(activity, actor) { if suitableFollow(activity, actor) {
responseType = "Accept" responseType = "Accept"
RedClient.HSet("subscription:"+domain.Host, "inbox_url", actor.Endpoints.SharedInbox) redClient.HSet("relay:subscription:"+domain.Host, "inbox_url", actor.Endpoints.SharedInbox)
} else { } else {
responseType = "Reject" responseType = "Reject"
} }
resp := activitypub.GenerateActivityResponse(Hostname, domain, responseType, *activity) resp := activitypub.GenerateActivityResponse(hostname, domain, responseType, *activity)
jsonData, _ := json.Marshal(&resp) jsonData, _ := json.Marshal(&resp)
go pushRegistorJob(actor.Inbox, jsonData) go pushRegistorJob(actor.Inbox, jsonData)
@ -193,7 +193,7 @@ func handleInbox(w http.ResponseWriter, r *http.Request) {
nestedActivity, _ := activitypub.DescribeNestedActivity(activity.Object) nestedActivity, _ := activitypub.DescribeNestedActivity(activity.Object)
if nestedActivity.Type == "Follow" && nestedActivity.Actor == activity.Actor { if nestedActivity.Type == "Follow" && nestedActivity.Actor == activity.Actor {
domain, _ := url.Parse(activity.Actor) domain, _ := url.Parse(activity.Actor)
RedClient.Del("subscription:" + domain.Host) redClient.Del("relay:subscription:" + domain.Host)
fmt.Println("Accept Unfollow Request : ", activity.Actor) fmt.Println("Accept Unfollow Request : ", activity.Actor)
w.WriteHeader(202) w.WriteHeader(202)

35
main.go
View File

@ -14,11 +14,6 @@ import (
"github.com/yukimochi/Activity-Relay/KeyLoader" "github.com/yukimochi/Activity-Relay/KeyLoader"
) )
var Hostname *url.URL
var Hostkey *rsa.PrivateKey
var RedClient *redis.Client
var MacServer *machinery.Server
// Actor : Relay's Actor // Actor : Relay's Actor
var Actor activitypub.Actor var Actor activitypub.Actor
@ -29,12 +24,16 @@ type relayConfig struct {
blockService bool blockService bool
} }
var hostname *url.URL
var hostkey *rsa.PrivateKey
var redClient *redis.Client
var macServer *machinery.Server
var relConfig relayConfig var relConfig relayConfig
func loadConfig() relayConfig { func loadConfig() relayConfig {
blockService, err := RedClient.HGet("relay:config", "block_service").Result() blockService, err := redClient.HGet("relay:config", "block_service").Result()
if err != nil { if err != nil {
RedClient.HSet("relay:config", "block_service", 0) redClient.HSet("relay:config", "block_service", 0)
blockService = "0" blockService = "0"
} }
return relayConfig{ return relayConfig{
@ -61,15 +60,15 @@ func main() {
} }
var err error var err error
Hostkey, err = keyloader.ReadPrivateKeyRSAfromPath(pemPath) hostkey, err = keyloader.ReadPrivateKeyRSAfromPath(pemPath)
if err != nil { if err != nil {
panic("Can't read Hostkey Pemfile") panic("Can't read Hostkey Pemfile")
} }
Hostname, err = url.Parse("https://" + relayDomain) hostname, err = url.Parse("https://" + relayDomain)
if err != nil { if err != nil {
panic("Can't parse Relay Domain") panic("Can't parse Relay Domain")
} }
RedClient = redis.NewClient(&redis.Options{ redClient = redis.NewClient(&redis.Options{
Addr: redisURL, Addr: redisURL,
}) })
@ -80,13 +79,13 @@ func main() {
ResultsExpireIn: 5, ResultsExpireIn: 5,
} }
MacServer, err = machinery.NewServer(macConfig) macServer, err = machinery.NewServer(macConfig)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
Actor = activitypub.GenerateActor(Hostname, &Hostkey.PublicKey) Actor = activitypub.GenerateActor(hostname, &hostkey.PublicKey)
WebfingerResource = activitypub.GenerateWebfingerResource(Hostname, &Actor) WebfingerResource = activitypub.GenerateWebfingerResource(hostname, &Actor)
// Load Config // Load Config
relConfig = loadConfig() relConfig = loadConfig()
@ -99,5 +98,15 @@ func main() {
fmt.Println("RELAY DOMAIN : ", relayDomain) fmt.Println("RELAY DOMAIN : ", relayDomain)
fmt.Println("REDIS URL : ", redisURL) fmt.Println("REDIS URL : ", redisURL)
fmt.Println("BIND ADDRESS : ", relayBind) fmt.Println("BIND ADDRESS : ", relayBind)
fmt.Println(" - Blocked Domain")
domains, _ := redClient.HKeys("relay:config:blockedDomain").Result()
for _, domain := range domains {
fmt.Println(domain)
}
fmt.Println(" - Limited Domain")
domains, _ = redClient.HKeys("relay:config:limitedDomain").Result()
for _, domain := range domains {
fmt.Println(domain)
}
http.ListenAndServe(relayBind, nil) http.ListenAndServe(relayBind, nil)
} }