Skip to content

Commit 5fd044e

Browse files
authored
cmd/sips, cmd/sipsctl: implement (#1)
* internal/log: add package * cmd/sips: begin implementation * cmd/sips: remove `Error: ` prefix * ipfsapi: finish implementation of `Client.PinLS()` * go.mod: add some dependencies * cmd/sips: open database and then fail at everything * dbutil: create package for dealing with the database * cmd/sips: try to authenticate users when adding pins * dbutil: check if buckets are nil when recursing through them in `FindPath()` * cmd/sips: add `AuthError.Unwrap()` * go.mod: update some dependencies * dbutil: overhaul pretty much everything * internal/log: fix a misusage of a logger * cmd/sips: fix API breakage * dbutil: fix a few problems and start implementing some more stuff * all: switch from raw `bbolt` to `storm` * cmd/sips: better logging of errors * cmd/sips: remove `AuthError` * cmd/sipsctl: begin implementation * internal/cli: move `$CONFIG` expansion to its own package * cmd/sips: use new `internal/cli` package * dbs: increment pin IDs * cmd/sipsctl: implement `token add` * cmd/sipsctl: begin working on user subcommands * dbs: fix indices for the schema * cmd/sipsctl: implement adding and listing users * cmd/sips: fix incorrect function call * cmd/sipsctl: add `token list` * cmd/sipsctl: add `pins list` * dbs: restructure to make pins just point to users Why didn't I just do this before, I wonder? * cmd/sips: partial implementation of adding a pin * cmd/sipsctl: begin implementing `pins sync` * cmd/sipsctl: remove `pins sync` * dbs: add types for a job queue * cmd/sips: fix API breakage and a few vet warnings * dbs: more changes to the schema * cmd/sipsctl: add creation time to stuff in the database * dbs: init `Job` bucket * cmd/sips: begin implementing the job queue * ipfsapi: add a few more functions * cmd/sips: finish basic implementation of adding a pin * dbs: remove some more job-related stuff * cmd/sips: partially implement getting a list of pins * sips: fix handling of empty list of CIDs when listing pins * cmd/sips: implement pin filtering when listing pins * cmd/sips: handle authentication a bit better * ipfsapi: add pin removal * cmd/sips: implement pin deletion * internal/ipfsapi: move from `ipfsapi` * cmd/sips: fix API breakage * internal/ipfsapi: add ability to update pins * cmd/sips: implement the last two `PinHandler` methods * sips: handle more error statuses * sips: try not returning any results array if there aren't any pins * Revert "sips: try not returning any results array if there aren't any pins" This reverts commit a376f1d. * sips: fix `Content-Type` in responses and refactor token handling * internal/ipfsapi: check response status * internal/ipfsapi: add context support * cmd/sips: fix API breakage * sips: don't use `http.Error()` * internal/ipfsapi: fix double slash in URL * internal/ipfsapi: fix double slash in API URLs * internal/ipfsapi: fix status code check * internal/ipfsapi: disable progress streaming * cmd/sipsctl: add `pins rm` * cmd/sipsctl: add `pins add` * cmd/sipsctl: add `tokens rm` * cmd/sipsctl: add `users rm` * cmd/sipsctl: more usage of transactions
1 parent 00532c4 commit 5fd044e

File tree

16 files changed

+1644
-144
lines changed

16 files changed

+1644
-144
lines changed

cmd/sips/pinhandler.go

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io/fs"
8+
"strconv"
9+
"time"
10+
11+
"github.com/DeedleFake/sips"
12+
"github.com/DeedleFake/sips/dbs"
13+
"github.com/DeedleFake/sips/internal/ipfsapi"
14+
"github.com/DeedleFake/sips/internal/log"
15+
"github.com/asdine/storm"
16+
)
17+
18+
type PinHandler struct {
19+
IPFS *ipfsapi.Client
20+
DB *storm.DB
21+
}
22+
23+
func (h PinHandler) Pins(ctx context.Context, query sips.PinQuery) ([]sips.PinStatus, error) {
24+
tx, err := h.DB.Begin(false)
25+
if err != nil {
26+
log.Errorf("begin transaction: %v", err)
27+
return nil, err
28+
}
29+
defer tx.Rollback()
30+
31+
user, err := auth(ctx, tx)
32+
if err != nil {
33+
log.Errorf("authenticate: %v", err)
34+
return nil, err
35+
}
36+
37+
selector := tx.Select(&queryMatcher{Query: query})
38+
if query.Limit > 0 {
39+
selector = selector.Limit(query.Limit)
40+
}
41+
42+
var dbpins []dbs.Pin
43+
err = selector.OrderBy("Created").Find(&dbpins)
44+
if (err != nil) && (!errors.Is(err, storm.ErrNotFound)) {
45+
log.Errorf("find pins for %v: %v", user.Name, err)
46+
return nil, err
47+
}
48+
49+
pins := make([]sips.PinStatus, 0, len(dbpins))
50+
for _, pin := range dbpins {
51+
pins = append(pins, sips.PinStatus{
52+
RequestID: strconv.FormatUint(pin.ID, 16),
53+
Status: sips.Pinned, // TODO: Handle this properly.
54+
Created: pin.Created,
55+
Pin: sips.Pin{
56+
CID: pin.CID,
57+
Name: pin.Name,
58+
},
59+
})
60+
}
61+
62+
return pins, nil
63+
}
64+
65+
func (h PinHandler) AddPin(ctx context.Context, pin sips.Pin) (sips.PinStatus, error) {
66+
tx, err := h.DB.Begin(true)
67+
if err != nil {
68+
log.Errorf("begin transaction: %v", err)
69+
return sips.PinStatus{}, err
70+
}
71+
defer tx.Rollback()
72+
73+
user, err := auth(ctx, tx)
74+
if err != nil {
75+
log.Errorf("authenticate: %v", err)
76+
return sips.PinStatus{}, err
77+
}
78+
79+
dbpin := dbs.Pin{
80+
Created: time.Now(),
81+
User: user.ID,
82+
Name: pin.Name,
83+
CID: pin.CID,
84+
}
85+
err = tx.Save(&dbpin)
86+
if err != nil {
87+
log.Errorf("save pin %q: %v", pin.CID, err)
88+
return sips.PinStatus{}, err
89+
}
90+
91+
if len(pin.Origins) != 0 {
92+
for _, origin := range pin.Origins {
93+
go h.IPFS.SwarmConnect(ctx, origin)
94+
}
95+
}
96+
97+
_, err = h.IPFS.PinAdd(ctx, pin.CID)
98+
if err != nil {
99+
log.Errorf("add pin %v: %v", pin.CID, err)
100+
return sips.PinStatus{}, err
101+
}
102+
103+
id, err := h.IPFS.ID(ctx)
104+
if err != nil {
105+
log.Errorf("get IPFS id: %v", err)
106+
// Purposefully don't return here.
107+
}
108+
109+
return sips.PinStatus{
110+
RequestID: strconv.FormatUint(dbpin.ID, 16),
111+
Status: sips.Pinning,
112+
Created: dbpin.Created,
113+
Delegates: id.Addresses,
114+
Pin: pin,
115+
}, tx.Commit()
116+
}
117+
118+
func (h PinHandler) GetPin(ctx context.Context, requestID string) (sips.PinStatus, error) {
119+
pinID, err := strconv.ParseUint(requestID, 16, 64)
120+
if err != nil {
121+
log.Errorf("parse request ID %q: %v", requestID, err)
122+
return sips.PinStatus{}, err
123+
}
124+
125+
tx, err := h.DB.Begin(false)
126+
if err != nil {
127+
log.Errorf("begin transaction: %v", err)
128+
return sips.PinStatus{}, err
129+
}
130+
defer tx.Rollback()
131+
132+
user, err := auth(ctx, tx)
133+
if err != nil {
134+
log.Errorf("authenticate: %v", err)
135+
return sips.PinStatus{}, err
136+
}
137+
138+
var pin dbs.Pin
139+
err = tx.One("ID", pinID, &pin)
140+
if err != nil {
141+
log.Errorf("find pin %v: %v", requestID, err)
142+
return sips.PinStatus{}, err
143+
}
144+
145+
if pin.User != user.ID {
146+
log.Errorf("user %v is not authorized to see pin %v", user.Name, requestID)
147+
return sips.PinStatus{}, fs.ErrPermission
148+
}
149+
150+
return sips.PinStatus{
151+
RequestID: requestID,
152+
Status: sips.Pinned,
153+
Created: pin.Created,
154+
Pin: sips.Pin{
155+
CID: pin.CID,
156+
Name: pin.Name,
157+
},
158+
}, nil
159+
}
160+
161+
func (h PinHandler) UpdatePin(ctx context.Context, requestID string, pin sips.Pin) (sips.PinStatus, error) {
162+
pinID, err := strconv.ParseUint(requestID, 16, 64)
163+
if err != nil {
164+
log.Errorf("parse request ID: %q: %v", requestID, err)
165+
return sips.PinStatus{}, err
166+
}
167+
168+
tx, err := h.DB.Begin(true)
169+
if err != nil {
170+
log.Errorf("begin transaction: %v", err)
171+
return sips.PinStatus{}, err
172+
}
173+
defer tx.Rollback()
174+
175+
user, err := auth(ctx, tx)
176+
if err != nil {
177+
log.Errorf("authenticate: %v", err)
178+
return sips.PinStatus{}, err
179+
}
180+
181+
var dbpin dbs.Pin
182+
err = tx.One("ID", pinID, &dbpin)
183+
if err != nil {
184+
log.Errorf("find pin %v: %v", requestID, err)
185+
return sips.PinStatus{}, err
186+
}
187+
oldCID := dbpin.CID
188+
189+
if dbpin.User != user.ID {
190+
log.Errorf("user %v not allowed to update pin %v", user.Name, requestID)
191+
return sips.PinStatus{}, fs.ErrPermission
192+
}
193+
194+
dbpin.Name = pin.Name
195+
dbpin.CID = pin.CID
196+
err = tx.Update(&dbpin)
197+
if err != nil {
198+
log.Errorf("update pin %v: %v", requestID, err)
199+
return sips.PinStatus{}, err
200+
}
201+
202+
if len(pin.Origins) != 0 {
203+
for _, origin := range pin.Origins {
204+
go h.IPFS.SwarmConnect(ctx, origin)
205+
}
206+
}
207+
208+
_, err = h.IPFS.PinUpdate(ctx, oldCID, pin.CID, false)
209+
if err != nil {
210+
log.Errorf("add pin %v: %v", pin.CID, err)
211+
return sips.PinStatus{}, err
212+
}
213+
214+
id, err := h.IPFS.ID(ctx)
215+
if err != nil {
216+
log.Errorf("get IPFS id: %v", err)
217+
// Purposefully don't return here.
218+
}
219+
220+
return sips.PinStatus{
221+
RequestID: requestID,
222+
Status: sips.Pinning,
223+
Created: dbpin.Created,
224+
Delegates: id.Addresses,
225+
Pin: sips.Pin{
226+
CID: pin.CID,
227+
Name: pin.Name,
228+
},
229+
}, tx.Commit()
230+
}
231+
232+
func (h PinHandler) DeletePin(ctx context.Context, requestID string) error {
233+
pinID, err := strconv.ParseUint(requestID, 16, 64)
234+
if err != nil {
235+
log.Errorf("parse request ID %q: %v", requestID, err)
236+
return err
237+
}
238+
239+
tx, err := h.DB.Begin(true)
240+
if err != nil {
241+
log.Errorf("begin transaction: %v", err)
242+
return err
243+
}
244+
defer tx.Rollback()
245+
246+
user, err := auth(ctx, tx)
247+
if err != nil {
248+
log.Errorf("authenticate: %v", err)
249+
return err
250+
}
251+
252+
var pin dbs.Pin
253+
err = tx.One("ID", pinID, &pin)
254+
if err != nil {
255+
log.Errorf("find pin %v: %v", requestID, err)
256+
return err
257+
}
258+
259+
if pin.User != user.ID {
260+
log.Errorf("user %v is not authorized to delete pin %v", user.Name, requestID)
261+
return fs.ErrPermission // TODO: That's just not right.
262+
}
263+
264+
err = tx.DeleteStruct(&pin)
265+
if err != nil {
266+
log.Errorf("delete pin %v: %v", requestID, err)
267+
return err
268+
}
269+
270+
_, err = h.IPFS.PinRm(ctx, pin.CID)
271+
if err != nil {
272+
log.Errorf("unpin %v: %v", pin.CID, err)
273+
return err
274+
}
275+
276+
return tx.Commit()
277+
}
278+
279+
func auth(ctx context.Context, db storm.Node) (user dbs.User, err error) {
280+
tokID, _ := sips.Token(ctx)
281+
282+
var tok dbs.Token
283+
err = db.One("ID", tokID, &tok)
284+
if err != nil {
285+
return dbs.User{}, fmt.Errorf("find token %v: %w", tokID, err)
286+
}
287+
288+
err = db.One("ID", tok.User, &user)
289+
if err != nil {
290+
return dbs.User{}, fmt.Errorf("find user: %w", err)
291+
}
292+
293+
return user, nil
294+
}
295+
296+
type queryMatcher struct {
297+
Query sips.PinQuery
298+
}
299+
300+
func (qm queryMatcher) Match(v interface{}) (bool, error) {
301+
pin, ok := v.(dbs.Pin)
302+
if !ok {
303+
return false, fmt.Errorf("expected pin, not %T", v)
304+
}
305+
306+
if len(qm.Query.CID) != 0 {
307+
var found bool
308+
for _, cid := range qm.Query.CID {
309+
if cid == pin.CID {
310+
found = true
311+
break
312+
}
313+
}
314+
if !found {
315+
return false, nil
316+
}
317+
}
318+
319+
if qm.Query.Name != "" {
320+
if !qm.Query.Match.Match(pin.Name, qm.Query.Name) {
321+
return false, nil
322+
}
323+
}
324+
325+
// TODO: Handle statuses.
326+
327+
if !qm.Query.Before.IsZero() {
328+
if !pin.Created.After(qm.Query.Before) {
329+
return false, nil
330+
}
331+
}
332+
if !qm.Query.After.IsZero() {
333+
if !pin.Created.Before(qm.Query.After) {
334+
return false, nil
335+
}
336+
}
337+
338+
return true, nil
339+
}

0 commit comments

Comments
 (0)