diff --git a/api/api.go b/api/api.go index 9a5511a..4de3177 100644 --- a/api/api.go +++ b/api/api.go @@ -3,7 +3,6 @@ package api import ( "fmt" "net/http" - "net/url" "time" "github.com/RichardKnop/machinery/v1" @@ -24,7 +23,6 @@ var ( // Nodeinfo : Relay's Nodeinfo Nodeinfo models.NodeinfoResources - hostURL *url.URL relayState models.RelayState machineryServer *machinery.Server actorCache *cache.Cache @@ -66,9 +64,8 @@ func initialize(globalConfig *models.RelayConfig) error { Actor = models.NewActivityPubActorFromSelfKey(globalConfig) actorCache = cache.New(5*time.Minute, 10*time.Minute) - hostURL = globalConfig.ServerHostname() - WebfingerResource.GenerateFromActor(hostURL, &Actor) - Nodeinfo.GenerateFromActor(hostURL, &Actor, version) + WebfingerResource.GenerateFromActor(globalConfig.ServerHostname(), &Actor) + Nodeinfo.GenerateFromActor(globalConfig.ServerHostname(), &Actor, version) return nil } diff --git a/api/decode.go b/api/decode.go index 5edb274..99ec81e 100644 --- a/api/decode.go +++ b/api/decode.go @@ -26,7 +26,7 @@ func decodeActivity(request *http.Request) (*models.Activity, *models.Actor, []b } KeyID := verifier.KeyId() keyOwnerActor := new(models.Actor) - err = keyOwnerActor.RetrieveRemoteActor(KeyID, fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, hostURL.Host), actorCache) + err = keyOwnerActor.RetrieveRemoteActor(KeyID, fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, globalConfig.ServerHostname().Host), actorCache) if err != nil { return nil, nil, nil, err } @@ -61,7 +61,7 @@ func decodeActivity(request *http.Request) (*models.Activity, *models.Actor, []b } var remoteActor models.Actor - err = remoteActor.RetrieveRemoteActor(activity.Actor, fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, hostURL.Host), actorCache) + err = remoteActor.RetrieveRemoteActor(activity.Actor, fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, globalConfig.ServerHostname().Host), actorCache) if err != nil { return nil, nil, nil, err } diff --git a/api/handle.go b/api/handle.go index 2d79665..ea4a562 100644 --- a/api/handle.go +++ b/api/handle.go @@ -214,7 +214,7 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco case "Follow": err = followAcceptable(activity, actor) if err != nil { - resp := activity.GenerateResponse(hostURL, "Reject") + resp := activity.GenerateResponse(globalConfig.ServerHostname(), "Reject") jsonData, _ := json.Marshal(&resp) go pushRegistorJob(actor.Inbox, jsonData) fmt.Println("Reject Follow Request : ", err.Error(), activity.Actor) @@ -233,7 +233,7 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco }) fmt.Println("Pending Follow Request : ", activity.Actor) } else { - resp := activity.GenerateResponse(hostURL, "Accept") + resp := activity.GenerateResponse(globalConfig.ServerHostname(), "Accept") jsonData, _ := json.Marshal(&resp) go pushRegistorJob(actor.Inbox, jsonData) relayState.AddSubscription(models.Subscription{ @@ -245,7 +245,7 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco fmt.Println("Accept Follow Request : ", activity.Actor) } } else { - resp := activity.GenerateResponse(hostURL, "Reject") + resp := activity.GenerateResponse(globalConfig.ServerHostname(), "Reject") jsonData, _ := json.Marshal(&resp) go pushRegistorJob(actor.Inbox, jsonData) fmt.Println("Reject Follow Request : ", activity.Actor) @@ -297,7 +297,7 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco } switch nestedObject.Type { case "Note": - resp := nestedObject.GenerateAnnounce(hostURL) + resp := nestedObject.GenerateAnnounce(globalConfig.ServerHostname()) jsonData, _ := json.Marshal(&resp) go pushRelayJob(domain.Host, jsonData) fmt.Println("Accept Announce Note : ", activity.Actor) diff --git a/api/handle_test.go b/api/handle_test.go index 19c84b4..7880654 100644 --- a/api/handle_test.go +++ b/api/handle_test.go @@ -26,7 +26,7 @@ func TestHandleWebfingerGet(t *testing.T) { req, _ := http.NewRequest("GET", s.URL, nil) q := req.URL.Query() - q.Add("resource", "acct:relay@"+hostURL.Host) + q.Add("resource", "acct:relay@"+globalConfig.ServerHostname().Host) req.URL.RawQuery = q.Encode() client := new(http.Client) r, err := client.Do(req) @@ -49,7 +49,7 @@ func TestHandleWebfingerGet(t *testing.T) { } domain, _ := url.Parse(wfresource.Links[0].Href) - if domain.Host != hostURL.Host { + if domain.Host != globalConfig.ServerHostname().Host { t.Fatalf("WebfingerResource's Host not valid.") } } @@ -193,7 +193,7 @@ func TestHandleActorGet(t *testing.T) { } domain, _ := url.Parse(actor.ID) - if domain.Host != hostURL.Host { + if domain.Host != globalConfig.ServerHostname().Host { t.Fatalf("Actor's Host not valid.") } } diff --git a/deliver/deriver.go b/deliver/deriver.go new file mode 100644 index 0000000..93766b9 --- /dev/null +++ b/deliver/deriver.go @@ -0,0 +1,93 @@ +package deliver + +import ( + "fmt" + "net/http" + "net/url" + "os" + "time" + + "github.com/RichardKnop/machinery/v1" + "github.com/RichardKnop/machinery/v1/log" + "github.com/go-redis/redis" + uuid "github.com/satori/go.uuid" + "github.com/yukimochi/Activity-Relay/models" +) + +var ( + version string + globalConfig *models.RelayConfig + + // Actor : Relay's Actor + Actor models.Actor + + redisClient *redis.Client + machineryServer *machinery.Server + httpClient *http.Client +) + +func relayActivity(args ...string) error { + inboxURL := args[0] + body := args[1] + err := sendActivity(inboxURL, Actor.ID, []byte(body), globalConfig.ActorKey()) + if err != nil { + domain, _ := url.Parse(inboxURL) + eval_script := "local change = redis.call('HSETNX',KEYS[1], 'last_error', ARGV[1]); if change == 1 then redis.call('EXPIRE', KEYS[1], ARGV[2]) end;" + redisClient.Eval(eval_script, []string{"relay:statistics:" + domain.Host}, err.Error(), 60).Result() + } + return err +} + +func registorActivity(args ...string) error { + inboxURL := args[0] + body := args[1] + err := sendActivity(inboxURL, Actor.ID, []byte(body), globalConfig.ActorKey()) + return err +} + +func Entrypoint(g *models.RelayConfig, v string) error { + var err error + globalConfig = g + version = v + + err = initialize(globalConfig) + if err != nil { + return err + } + + err = machineryServer.RegisterTask("registor", registorActivity) + if err != nil { + return err + } + err = machineryServer.RegisterTask("relay", relayActivity) + if err != nil { + return err + } + + workerID := uuid.NewV4() + worker := machineryServer.NewWorker(workerID.String(), globalConfig.JobConcurrency()) + err = worker.Launch() + if err != nil { + fmt.Fprintln(os.Stderr, err) + } + + return nil +} + +func initialize(globalConfig *models.RelayConfig) error { + var err error + + redisClient = globalConfig.RedisClient() + + machineryServer, err = models.NewMachineryServer(globalConfig) + if err != nil { + return err + } + httpClient = &http.Client{Timeout: time.Duration(5) * time.Second} + + Actor = models.NewActivityPubActorFromSelfKey(globalConfig) + newNullLogger := NewNullLogger() + log.DEBUG = newNullLogger + + return nil +} diff --git a/worker/worker_test.go b/deliver/deriver_test.go similarity index 78% rename from worker/worker_test.go rename to deliver/deriver_test.go index dfd9d28..56af7bb 100644 --- a/worker/worker_test.go +++ b/deliver/deriver_test.go @@ -1,6 +1,7 @@ -package main +package deliver import ( + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -9,15 +10,33 @@ import ( "testing" "github.com/spf13/viper" + "github.com/yukimochi/Activity-Relay/models" ) func TestMain(m *testing.M) { - viper.Set("actor_pem", "../misc/testKey.pem") - viper.Set("relay_domain", "relay.yukimochi.example.org") - initConfig() - redisClient.FlushAll().Result() + var err error - // Load Config + testConfigPath := "../misc/config.yml" + file, _ := os.Open(testConfigPath) + defer file.Close() + + viper.SetConfigType("yaml") + viper.ReadConfig(file) + viper.Set("ACTOR_PEM", "../misc/testKey.pem") + viper.BindEnv("REDIS_URL") + + globalConfig, err = models.NewRelayConfig() + if err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } + + err = initialize(globalConfig) + if err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } + redisClient.FlushAll().Result() code := m.Run() os.Exit(code) } @@ -52,7 +71,7 @@ func TestRelayActivityNoHost(t *testing.T) { t.Fatal("Failed - Error not reported.") } domain, _ := url.Parse("http://nohost.example.jp") - data, err := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result() + data, _ := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result() if data == "" { t.Fatal("Failed - Error not cached.") } @@ -70,7 +89,7 @@ func TestRelayActivityResp500(t *testing.T) { t.Fatal("Failed - Error not reported.") } domain, _ := url.Parse(s.URL) - data, err := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result() + data, _ := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result() if data == "" { t.Fatal("Failed - Error not cached.") } diff --git a/worker/logger.go b/deliver/logger.go similarity index 97% rename from worker/logger.go rename to deliver/logger.go index 234cd43..16dbfe3 100644 --- a/worker/logger.go +++ b/deliver/logger.go @@ -1,4 +1,4 @@ -package main +package deliver // NullLogger : Null logger for debug output type NullLogger struct { diff --git a/worker/sender.go b/deliver/sender.go similarity index 91% rename from worker/sender.go rename to deliver/sender.go index 8028902..5391bd1 100644 --- a/worker/sender.go +++ b/deliver/sender.go @@ -1,4 +1,4 @@ -package main +package deliver import ( "bytes" @@ -11,7 +11,6 @@ import ( "time" httpdate "github.com/Songmu/go-httpdate" - "github.com/spf13/viper" "github.com/yukimochi/httpsig" ) @@ -36,7 +35,7 @@ func appendSignature(request *http.Request, body *[]byte, KeyID string, publicKe func sendActivity(inboxURL string, KeyID string, body []byte, publicKey *rsa.PrivateKey) error { req, _ := http.NewRequest("POST", inboxURL, bytes.NewBuffer(body)) req.Header.Set("Content-Type", "application/activity+json") - req.Header.Set("User-Agent", fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", viper.GetString("relay_servicename"), version, hostURL.Host)) + req.Header.Set("User-Agent", fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServicename(), version, globalConfig.ServerHostname().Host)) req.Header.Set("Date", httpdate.Time2Str(time.Now())) appendSignature(req, &body, KeyID, publicKey) resp, err := httpClient.Do(req) diff --git a/main.go b/main.go index 4c88e3e..900e050 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,15 @@ /* Yet another powerful customizable ActivityPub relay server written in Go. + Run Activity-Relay + API Server ./Activity-Relay -c server +Job Worker + ./Activity-Relay -c worker + Config + YAML Format ACTOR_PEM: actor.pem REDIS_URL: redis://localhost:6379 @@ -16,6 +22,7 @@ YAML Format RELAY_ICON: https://example.com/example_icon.png RELAY_IMAGE: https://example.com/example_image.png Environment Variable + This is Optional : When config file not exist, use environment variables. - ACTOR_PEM - REDIS_URL @@ -26,6 +33,7 @@ This is Optional : When config file not exist, use environment variables. - RELAY_SUMMARY - RELAY_ICON - RELAY_IMAGE + */ package main @@ -36,6 +44,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/yukimochi/Activity-Relay/api" + "github.com/yukimochi/Activity-Relay/deliver" "github.com/yukimochi/Activity-Relay/models" ) @@ -69,11 +78,28 @@ func buildCommand() *cobra.Command { }, } + var worker = &cobra.Command{ + Use: "worker", + Short: "Activity-Relay Job Worker", + Long: "Activity-Relay Job Worker is providing ActivityPub Activity deliverer", + RunE: func(cmd *cobra.Command, args []string) error { + initConfig(cmd, args) + fmt.Println(globalConfig.DumpWelcomeMessage("Job Worker", version)) + err := deliver.Entrypoint(globalConfig, version) + if err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } + return nil + }, + } + var app = &cobra.Command{ Short: "YUKIMOCHI Activity-Relay", Long: "YUKIMOCHI Activity-Relay - ActivityPub Relay Server", } app.AddCommand(server) + app.AddCommand(worker) return app } diff --git a/models/config.go b/models/config.go index 5f78cbb..22ca856 100644 --- a/models/config.go +++ b/models/config.go @@ -24,6 +24,7 @@ type RelayConfig struct { serviceSummary string serviceIconURL *url.URL serviceImageURL *url.URL + jobConcurrency int } // NewRelayConfig create valid RelayConfig from viper configuration. If invalid configuration detected, return error. @@ -45,6 +46,11 @@ func NewRelayConfig() (*RelayConfig, error) { imageURL = nil } + jobConcurrency := viper.GetInt("JOB_CONCURRENCY") + if jobConcurrency < 1 { + return nil, errors.New("JOB_CONCURRENCY IS 0 OR EMPTY. SHOULD BE MORE THAN 1") + } + privateKey, err := readPrivateKeyRSA(viper.GetString("ACTOR_PEM")) if err != nil { return nil, errors.New("ACTOR_PEM: " + err.Error()) @@ -73,6 +79,7 @@ func NewRelayConfig() (*RelayConfig, error) { serviceSummary: viper.GetString("RELAY_SUMMARY"), serviceIconURL: iconURL, serviceImageURL: imageURL, + jobConcurrency: jobConcurrency, }, nil } @@ -86,11 +93,21 @@ func (relayConfig *RelayConfig) ServerHostname() *url.URL { return relayConfig.domain } -// ServerHostname is API Server's hostname definition. +// ServerHostname is API Server's servername definition. func (relayConfig *RelayConfig) ServerServicename() string { return relayConfig.serviceName } +// JobConcurrency is API Worker's jobConcurrency definition. +func (relayConfig *RelayConfig) JobConcurrency() int { + return relayConfig.jobConcurrency +} + +// ActorKey is API Worker's HTTPSignature private key. +func (relayConfig *RelayConfig) ActorKey() *rsa.PrivateKey { + return relayConfig.actorKey +} + // CreateRedisClient is create new redis client from RelayConfig. func (relayConfig *RelayConfig) RedisClient() *redis.Client { return relayConfig.redisClient diff --git a/readme.md b/readme.md index a7948e6..cd52cc4 100644 --- a/readme.md +++ b/readme.md @@ -11,6 +11,8 @@ ## Packages - `github.com/yukimochi/Activity-Relay` + - `github.com/yukimochi/Activity-Relay/api` + - `github.com/yukimochi/Activity-Relay/deliver` - `github.com/yukimochi/Activity-Relay/worker` - `github.com/yukimochi/Activity-Relay/cli` diff --git a/worker/worker.go b/worker/worker.go deleted file mode 100644 index a930aae..0000000 --- a/worker/worker.go +++ /dev/null @@ -1,121 +0,0 @@ -package main - -import ( - "crypto/rsa" - "fmt" - "net/http" - "net/url" - "os" - "time" - - "github.com/RichardKnop/machinery/v1" - "github.com/RichardKnop/machinery/v1/config" - "github.com/RichardKnop/machinery/v1/log" - "github.com/go-redis/redis" - uuid "github.com/satori/go.uuid" - "github.com/spf13/viper" - keyloader "github.com/yukimochi/Activity-Relay/KeyLoader" - "github.com/yukimochi/Activity-Relay/models" -) - -var ( - version string - - // Actor : Relay's Actor - Actor models.Actor - - hostURL *url.URL - hostPrivatekey *rsa.PrivateKey - redisClient *redis.Client - machineryServer *machinery.Server - httpClient *http.Client -) - -func relayActivity(args ...string) error { - inboxURL := args[0] - body := args[1] - err := sendActivity(inboxURL, Actor.ID, []byte(body), hostPrivatekey) - if err != nil { - domain, _ := url.Parse(inboxURL) - mod, _ := redisClient.HSetNX("relay:statistics:"+domain.Host, "last_error", err.Error()).Result() - if mod { - redisClient.Expire("relay:statistics:"+domain.Host, time.Duration(time.Minute)) - } - } - return err -} - -func registorActivity(args ...string) error { - inboxURL := args[0] - body := args[1] - err := sendActivity(inboxURL, Actor.ID, []byte(body), hostPrivatekey) - return err -} - -func initConfig() { - viper.SetConfigName("config") - viper.AddConfigPath(".") - err := viper.ReadInConfig() - if err != nil { - fmt.Println("Config file is not exists. Use environment variables.") - viper.BindEnv("actor_pem") - viper.BindEnv("redis_url") - viper.BindEnv("relay_bind") - viper.BindEnv("relay_domain") - viper.BindEnv("relay_servicename") - viper.BindEnv("job_concurrency") - } else { - Actor.Summary = viper.GetString("relay_summary") - Actor.Icon = &models.Image{URL: viper.GetString("relay_icon")} - Actor.Image = &models.Image{URL: viper.GetString("relay_image")} - } - Actor.Name = viper.GetString("relay_servicename") - - hostURL, _ = url.Parse("https://" + viper.GetString("relay_domain")) - hostPrivatekey, _ = keyloader.ReadPrivateKeyRSAfromPath(viper.GetString("actor_pem")) - redisOption, err := redis.ParseURL(viper.GetString("redis_url")) - if err != nil { - panic(err) - } - redisClient = redis.NewClient(redisOption) - machineryConfig := &config.Config{ - Broker: viper.GetString("redis_url"), - DefaultQueue: "relay", - ResultBackend: viper.GetString("redis_url"), - ResultsExpireIn: 5, - } - machineryServer, err = machinery.NewServer(machineryConfig) - if err != nil { - panic(err) - } - httpClient = &http.Client{Timeout: time.Duration(5) * time.Second} - - Actor.GenerateSelfKey(hostURL, &hostPrivatekey.PublicKey) - newNullLogger := NewNullLogger() - log.DEBUG = newNullLogger - - fmt.Println("Welcome to YUKIMOCHI Activity-Relay [Worker]", version) - fmt.Println(" - Configurations") - fmt.Println("RELAY DOMAIN : ", hostURL.Host) - fmt.Println("REDIS URL : ", viper.GetString("redis_url")) -} - -func main() { - initConfig() - - err := machineryServer.RegisterTask("registor", registorActivity) - if err != nil { - panic(err.Error()) - } - err = machineryServer.RegisterTask("relay", relayActivity) - if err != nil { - panic(err.Error()) - } - - workerID := uuid.NewV4() - worker := machineryServer.NewWorker(workerID.String(), viper.GetInt("job_concurrency")) - err = worker.Launch() - if err != nil { - fmt.Fprintln(os.Stderr, err) - } -}