Move worker to deliver.

This commit is contained in:
Naoki Kosaka 2021-06-18 22:02:49 +09:00
parent 99f870e0f2
commit aafac21664
12 changed files with 180 additions and 148 deletions

View File

@ -3,7 +3,6 @@ package api
import (
"fmt"
"net/http"
"net/url"
"time"
"github.com/RichardKnop/machinery/v1"
@ -24,7 +23,6 @@ var (
// Nodeinfo : Relay's Nodeinfo
Nodeinfo models.NodeinfoResources
hostURL *url.URL
relayState models.RelayState
machineryServer *machinery.Server
actorCache *cache.Cache
@ -66,9 +64,8 @@ func initialize(globalConfig *models.RelayConfig) error {
Actor = models.NewActivityPubActorFromSelfKey(globalConfig)
actorCache = cache.New(5*time.Minute, 10*time.Minute)
hostURL = globalConfig.ServerHostname()
WebfingerResource.GenerateFromActor(hostURL, &Actor)
Nodeinfo.GenerateFromActor(hostURL, &Actor, version)
WebfingerResource.GenerateFromActor(globalConfig.ServerHostname(), &Actor)
Nodeinfo.GenerateFromActor(globalConfig.ServerHostname(), &Actor, version)
return nil
}

View File

@ -26,7 +26,7 @@ func decodeActivity(request *http.Request) (*models.Activity, *models.Actor, []b
}
KeyID := verifier.KeyId()
keyOwnerActor := new(models.Actor)
err = keyOwnerActor.RetrieveRemoteActor(KeyID, fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, hostURL.Host), actorCache)
err = keyOwnerActor.RetrieveRemoteActor(KeyID, fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, globalConfig.ServerHostname().Host), actorCache)
if err != nil {
return nil, nil, nil, err
}
@ -61,7 +61,7 @@ func decodeActivity(request *http.Request) (*models.Activity, *models.Actor, []b
}
var remoteActor models.Actor
err = remoteActor.RetrieveRemoteActor(activity.Actor, fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, hostURL.Host), actorCache)
err = remoteActor.RetrieveRemoteActor(activity.Actor, fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, globalConfig.ServerHostname().Host), actorCache)
if err != nil {
return nil, nil, nil, err
}

View File

@ -214,7 +214,7 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco
case "Follow":
err = followAcceptable(activity, actor)
if err != nil {
resp := activity.GenerateResponse(hostURL, "Reject")
resp := activity.GenerateResponse(globalConfig.ServerHostname(), "Reject")
jsonData, _ := json.Marshal(&resp)
go pushRegistorJob(actor.Inbox, jsonData)
fmt.Println("Reject Follow Request : ", err.Error(), activity.Actor)
@ -233,7 +233,7 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco
})
fmt.Println("Pending Follow Request : ", activity.Actor)
} else {
resp := activity.GenerateResponse(hostURL, "Accept")
resp := activity.GenerateResponse(globalConfig.ServerHostname(), "Accept")
jsonData, _ := json.Marshal(&resp)
go pushRegistorJob(actor.Inbox, jsonData)
relayState.AddSubscription(models.Subscription{
@ -245,7 +245,7 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco
fmt.Println("Accept Follow Request : ", activity.Actor)
}
} else {
resp := activity.GenerateResponse(hostURL, "Reject")
resp := activity.GenerateResponse(globalConfig.ServerHostname(), "Reject")
jsonData, _ := json.Marshal(&resp)
go pushRegistorJob(actor.Inbox, jsonData)
fmt.Println("Reject Follow Request : ", activity.Actor)
@ -297,7 +297,7 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco
}
switch nestedObject.Type {
case "Note":
resp := nestedObject.GenerateAnnounce(hostURL)
resp := nestedObject.GenerateAnnounce(globalConfig.ServerHostname())
jsonData, _ := json.Marshal(&resp)
go pushRelayJob(domain.Host, jsonData)
fmt.Println("Accept Announce Note : ", activity.Actor)

View File

@ -26,7 +26,7 @@ func TestHandleWebfingerGet(t *testing.T) {
req, _ := http.NewRequest("GET", s.URL, nil)
q := req.URL.Query()
q.Add("resource", "acct:relay@"+hostURL.Host)
q.Add("resource", "acct:relay@"+globalConfig.ServerHostname().Host)
req.URL.RawQuery = q.Encode()
client := new(http.Client)
r, err := client.Do(req)
@ -49,7 +49,7 @@ func TestHandleWebfingerGet(t *testing.T) {
}
domain, _ := url.Parse(wfresource.Links[0].Href)
if domain.Host != hostURL.Host {
if domain.Host != globalConfig.ServerHostname().Host {
t.Fatalf("WebfingerResource's Host not valid.")
}
}
@ -193,7 +193,7 @@ func TestHandleActorGet(t *testing.T) {
}
domain, _ := url.Parse(actor.ID)
if domain.Host != hostURL.Host {
if domain.Host != globalConfig.ServerHostname().Host {
t.Fatalf("Actor's Host not valid.")
}
}

93
deliver/deriver.go Normal file
View File

@ -0,0 +1,93 @@
package deliver
import (
"fmt"
"net/http"
"net/url"
"os"
"time"
"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/log"
"github.com/go-redis/redis"
uuid "github.com/satori/go.uuid"
"github.com/yukimochi/Activity-Relay/models"
)
var (
version string
globalConfig *models.RelayConfig
// Actor : Relay's Actor
Actor models.Actor
redisClient *redis.Client
machineryServer *machinery.Server
httpClient *http.Client
)
func relayActivity(args ...string) error {
inboxURL := args[0]
body := args[1]
err := sendActivity(inboxURL, Actor.ID, []byte(body), globalConfig.ActorKey())
if err != nil {
domain, _ := url.Parse(inboxURL)
eval_script := "local change = redis.call('HSETNX',KEYS[1], 'last_error', ARGV[1]); if change == 1 then redis.call('EXPIRE', KEYS[1], ARGV[2]) end;"
redisClient.Eval(eval_script, []string{"relay:statistics:" + domain.Host}, err.Error(), 60).Result()
}
return err
}
func registorActivity(args ...string) error {
inboxURL := args[0]
body := args[1]
err := sendActivity(inboxURL, Actor.ID, []byte(body), globalConfig.ActorKey())
return err
}
func Entrypoint(g *models.RelayConfig, v string) error {
var err error
globalConfig = g
version = v
err = initialize(globalConfig)
if err != nil {
return err
}
err = machineryServer.RegisterTask("registor", registorActivity)
if err != nil {
return err
}
err = machineryServer.RegisterTask("relay", relayActivity)
if err != nil {
return err
}
workerID := uuid.NewV4()
worker := machineryServer.NewWorker(workerID.String(), globalConfig.JobConcurrency())
err = worker.Launch()
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
return nil
}
func initialize(globalConfig *models.RelayConfig) error {
var err error
redisClient = globalConfig.RedisClient()
machineryServer, err = models.NewMachineryServer(globalConfig)
if err != nil {
return err
}
httpClient = &http.Client{Timeout: time.Duration(5) * time.Second}
Actor = models.NewActivityPubActorFromSelfKey(globalConfig)
newNullLogger := NewNullLogger()
log.DEBUG = newNullLogger
return nil
}

View File

@ -1,6 +1,7 @@
package main
package deliver
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -9,15 +10,33 @@ import (
"testing"
"github.com/spf13/viper"
"github.com/yukimochi/Activity-Relay/models"
)
func TestMain(m *testing.M) {
viper.Set("actor_pem", "../misc/testKey.pem")
viper.Set("relay_domain", "relay.yukimochi.example.org")
initConfig()
redisClient.FlushAll().Result()
var err error
// Load Config
testConfigPath := "../misc/config.yml"
file, _ := os.Open(testConfigPath)
defer file.Close()
viper.SetConfigType("yaml")
viper.ReadConfig(file)
viper.Set("ACTOR_PEM", "../misc/testKey.pem")
viper.BindEnv("REDIS_URL")
globalConfig, err = models.NewRelayConfig()
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
err = initialize(globalConfig)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
redisClient.FlushAll().Result()
code := m.Run()
os.Exit(code)
}
@ -52,7 +71,7 @@ func TestRelayActivityNoHost(t *testing.T) {
t.Fatal("Failed - Error not reported.")
}
domain, _ := url.Parse("http://nohost.example.jp")
data, err := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result()
data, _ := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result()
if data == "" {
t.Fatal("Failed - Error not cached.")
}
@ -70,7 +89,7 @@ func TestRelayActivityResp500(t *testing.T) {
t.Fatal("Failed - Error not reported.")
}
domain, _ := url.Parse(s.URL)
data, err := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result()
data, _ := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result()
if data == "" {
t.Fatal("Failed - Error not cached.")
}

View File

@ -1,4 +1,4 @@
package main
package deliver
// NullLogger : Null logger for debug output
type NullLogger struct {

View File

@ -1,4 +1,4 @@
package main
package deliver
import (
"bytes"
@ -11,7 +11,6 @@ import (
"time"
httpdate "github.com/Songmu/go-httpdate"
"github.com/spf13/viper"
"github.com/yukimochi/httpsig"
)
@ -36,7 +35,7 @@ func appendSignature(request *http.Request, body *[]byte, KeyID string, publicKe
func sendActivity(inboxURL string, KeyID string, body []byte, publicKey *rsa.PrivateKey) error {
req, _ := http.NewRequest("POST", inboxURL, bytes.NewBuffer(body))
req.Header.Set("Content-Type", "application/activity+json")
req.Header.Set("User-Agent", fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", viper.GetString("relay_servicename"), version, hostURL.Host))
req.Header.Set("User-Agent", fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, globalConfig.ServerHostname().Host))
req.Header.Set("Date", httpdate.Time2Str(time.Now()))
appendSignature(req, &body, KeyID, publicKey)
resp, err := httpClient.Do(req)

26
main.go
View File

@ -1,9 +1,15 @@
/*
Yet another powerful customizable ActivityPub relay server written in Go.
Run Activity-Relay
API Server
./Activity-Relay -c <Path of config file> server
Job Worker
./Activity-Relay -c <Path of config file> worker
Config
YAML Format
ACTOR_PEM: actor.pem
REDIS_URL: redis://localhost:6379
@ -16,6 +22,7 @@ YAML Format
RELAY_ICON: https://example.com/example_icon.png
RELAY_IMAGE: https://example.com/example_image.png
Environment Variable
This is Optional : When config file not exist, use environment variables.
- ACTOR_PEM
- REDIS_URL
@ -26,6 +33,7 @@ This is Optional : When config file not exist, use environment variables.
- RELAY_SUMMARY
- RELAY_ICON
- RELAY_IMAGE
*/
package main
@ -36,6 +44,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/yukimochi/Activity-Relay/api"
"github.com/yukimochi/Activity-Relay/deliver"
"github.com/yukimochi/Activity-Relay/models"
)
@ -69,11 +78,28 @@ func buildCommand() *cobra.Command {
},
}
var worker = &cobra.Command{
Use: "worker",
Short: "Activity-Relay Job Worker",
Long: "Activity-Relay Job Worker is providing ActivityPub Activity deliverer",
RunE: func(cmd *cobra.Command, args []string) error {
initConfig(cmd, args)
fmt.Println(globalConfig.DumpWelcomeMessage("Job Worker", version))
err := deliver.Entrypoint(globalConfig, version)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
return nil
},
}
var app = &cobra.Command{
Short: "YUKIMOCHI Activity-Relay",
Long: "YUKIMOCHI Activity-Relay - ActivityPub Relay Server",
}
app.AddCommand(server)
app.AddCommand(worker)
return app
}

View File

@ -24,6 +24,7 @@ type RelayConfig struct {
serviceSummary string
serviceIconURL *url.URL
serviceImageURL *url.URL
jobConcurrency int
}
// NewRelayConfig create valid RelayConfig from viper configuration. If invalid configuration detected, return error.
@ -45,6 +46,11 @@ func NewRelayConfig() (*RelayConfig, error) {
imageURL = nil
}
jobConcurrency := viper.GetInt("JOB_CONCURRENCY")
if jobConcurrency < 1 {
return nil, errors.New("JOB_CONCURRENCY IS 0 OR EMPTY. SHOULD BE MORE THAN 1")
}
privateKey, err := readPrivateKeyRSA(viper.GetString("ACTOR_PEM"))
if err != nil {
return nil, errors.New("ACTOR_PEM: " + err.Error())
@ -73,6 +79,7 @@ func NewRelayConfig() (*RelayConfig, error) {
serviceSummary: viper.GetString("RELAY_SUMMARY"),
serviceIconURL: iconURL,
serviceImageURL: imageURL,
jobConcurrency: jobConcurrency,
}, nil
}
@ -86,11 +93,21 @@ func (relayConfig *RelayConfig) ServerHostname() *url.URL {
return relayConfig.domain
}
// ServerHostname is API Server's hostname definition.
// ServerHostname is API Server's servername definition.
func (relayConfig *RelayConfig) ServerServicename() string {
return relayConfig.serviceName
}
// JobConcurrency is API Worker's jobConcurrency definition.
func (relayConfig *RelayConfig) JobConcurrency() int {
return relayConfig.jobConcurrency
}
// ActorKey is API Worker's HTTPSignature private key.
func (relayConfig *RelayConfig) ActorKey() *rsa.PrivateKey {
return relayConfig.actorKey
}
// CreateRedisClient is create new redis client from RelayConfig.
func (relayConfig *RelayConfig) RedisClient() *redis.Client {
return relayConfig.redisClient

View File

@ -11,6 +11,8 @@
## Packages
- `github.com/yukimochi/Activity-Relay`
- `github.com/yukimochi/Activity-Relay/api`
- `github.com/yukimochi/Activity-Relay/deliver`
- `github.com/yukimochi/Activity-Relay/worker`
- `github.com/yukimochi/Activity-Relay/cli`

View File

@ -1,121 +0,0 @@
package main
import (
"crypto/rsa"
"fmt"
"net/http"
"net/url"
"os"
"time"
"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/go-redis/redis"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
keyloader "github.com/yukimochi/Activity-Relay/KeyLoader"
"github.com/yukimochi/Activity-Relay/models"
)
var (
version string
// Actor : Relay's Actor
Actor models.Actor
hostURL *url.URL
hostPrivatekey *rsa.PrivateKey
redisClient *redis.Client
machineryServer *machinery.Server
httpClient *http.Client
)
func relayActivity(args ...string) error {
inboxURL := args[0]
body := args[1]
err := sendActivity(inboxURL, Actor.ID, []byte(body), hostPrivatekey)
if err != nil {
domain, _ := url.Parse(inboxURL)
mod, _ := redisClient.HSetNX("relay:statistics:"+domain.Host, "last_error", err.Error()).Result()
if mod {
redisClient.Expire("relay:statistics:"+domain.Host, time.Duration(time.Minute))
}
}
return err
}
func registorActivity(args ...string) error {
inboxURL := args[0]
body := args[1]
err := sendActivity(inboxURL, Actor.ID, []byte(body), hostPrivatekey)
return err
}
func initConfig() {
viper.SetConfigName("config")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
fmt.Println("Config file is not exists. Use environment variables.")
viper.BindEnv("actor_pem")
viper.BindEnv("redis_url")
viper.BindEnv("relay_bind")
viper.BindEnv("relay_domain")
viper.BindEnv("relay_servicename")
viper.BindEnv("job_concurrency")
} else {
Actor.Summary = viper.GetString("relay_summary")
Actor.Icon = &models.Image{URL: viper.GetString("relay_icon")}
Actor.Image = &models.Image{URL: viper.GetString("relay_image")}
}
Actor.Name = viper.GetString("relay_servicename")
hostURL, _ = url.Parse("https://" + viper.GetString("relay_domain"))
hostPrivatekey, _ = keyloader.ReadPrivateKeyRSAfromPath(viper.GetString("actor_pem"))
redisOption, err := redis.ParseURL(viper.GetString("redis_url"))
if err != nil {
panic(err)
}
redisClient = redis.NewClient(redisOption)
machineryConfig := &config.Config{
Broker: viper.GetString("redis_url"),
DefaultQueue: "relay",
ResultBackend: viper.GetString("redis_url"),
ResultsExpireIn: 5,
}
machineryServer, err = machinery.NewServer(machineryConfig)
if err != nil {
panic(err)
}
httpClient = &http.Client{Timeout: time.Duration(5) * time.Second}
Actor.GenerateSelfKey(hostURL, &hostPrivatekey.PublicKey)
newNullLogger := NewNullLogger()
log.DEBUG = newNullLogger
fmt.Println("Welcome to YUKIMOCHI Activity-Relay [Worker]", version)
fmt.Println(" - Configurations")
fmt.Println("RELAY DOMAIN : ", hostURL.Host)
fmt.Println("REDIS URL : ", viper.GetString("redis_url"))
}
func main() {
initConfig()
err := machineryServer.RegisterTask("registor", registorActivity)
if err != nil {
panic(err.Error())
}
err = machineryServer.RegisterTask("relay", relayActivity)
if err != nil {
panic(err.Error())
}
workerID := uuid.NewV4()
worker := machineryServer.NewWorker(workerID.String(), viper.GetInt("job_concurrency"))
err = worker.Launch()
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
}