Unverified Commit 478a6f10 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Round out diagnostic parameters (#157)

Add a libp2p over http test. Add max request in progress configurable parameter. Add block
diagnostics. Add readme explaining parameters
parent 9f9abf55
# go-graphsync test-plan
### What This Does
This test plan measures a series of transfers between two nodes with graphsync, optionally comparing them to HTTP. It offers a wide variety of configurable parameters, which are documented here.
### File Parameters
These parameters configure the nature of the file that is transfered:
- `size` - size of file to transfer, in human-friendly form
- **Default**: 1MiB
- `chunk_size` - unixfs chunk size (power of 2), controls the size of the leaves in file
- **Default**: 20 *(or 1MB chunks)*
- `links_per_level` - unixfs links per level, controlles UnixFS DAG shape (wide vs deep)
- **Default**: 1024
- `raw_leaves` - should unixfs leaves be raw bytes (true), or wrapped as protonodes (false)
- **Default**: true
- `disk_store` - when we ingest the file to unix fs, should the blockstore where it lives be stored on disk (true) or memory (false)
- **Default**: default - false
- `concurrency` - number of files to construct and attempt to transfer *simultaneously*
- **Default**: 1
Why you might want to change these:
- obviously large file sizes more closely mirror use cases in a typical filecoin data transfer work load
- the links per level, chunk size, and raw leaves allow you to expriment with different dag structures and see how graphsync performs in different conditions
- the disk store allows you to measure the impact of datastore performance
- concurrency allows you to test how graphsync performs under heavy loads of attempting transfer many files simultaneously
### Networking Parameters
These parameters control the parameters for the network layer
- `secure_channel` - type secure encoding for the libp2p channel
- **Default**: "noise"
- `latencies` - list of non-zero latencies to run the test under.
- **Default**: 100ms, 200ms, 300ms
- `no_latency_case` - also run a test case with no latency
- **Default**: true
- `bandwidths` - list limited bandwidths (egress bytes/s) to run the test under (written as humanized sizes).
- **Default**: 10M, 1M, 512kb
- `unlimited_bandwidth_case` - also run a test case with unlimited latency
- **Default**: true
Why you might want to change these:
- we may pay a penalty for the cost of transfering over secure io
- bandwidth and latency parameters allow you to test graphsync under different network conditions. Importantly, these parameters generate a new test case for each instance, in a combinatorial form. So, if you you do two latencies and two bandwidths, you will get 4 rounds. And if concurrency is >1, each round with have more than one transfer
### Graphsync Options
The parameters control values passed constructing graphsync that may affect overall performance. Their default values are the same default values is no value is passed to the graphsync constructor
- `max_memory_per_peer` - the maximum amount of data a responder can buffer in memory for a single peer while it waits for it to be sent out over the wire
- **Default**: 16MB
- `max_memory_total` - the maximum amount of data a responder can buffer in memory for *all peers* while it waits for it to be sent out over the wire
- **Default**: 256MB
- `max_in_progress_requests` - The maximum number of requests Graphsync will respond to at once. When graphsync receives more than this number of simultaneous in bound requests, those after the first six (with a priotization that distributes evenly among peers) will wait for other requests to finish before they beginnin responding.
- **Default**: 6
These performance configuration parameters in GraphSync may cause bottlenecks with their default values. For example if the `concurrency` parameter is greater than 6, the remaining files will block until graphsync finishes some of the first 6. The buffering parameters may artificially lower performance on a fast connection. In a production context, they can be adjusted upwards based on the resources and goals of the graphsync node operator
### HTTP Comparison Parameters
The parameters allow you to compare graphsync performance against transfer of the same data under similar conditions over HTTP
- `compare_http` - run the HTTP comparison test
- **Default**: true
- `use_libp2p_http` - if we run the HTTP comparison test, should we use HTTP over libp2p (true) or just use standard HTTP on top of normal TCP/IP (false)
- **Default**: false
### Diagnostic Parameters
These parameters control what kind of additional diagnostic data the test will generate
- `memory_snapshots` - specifies whether we should take memory snapshots as we run. Has three potention values: *none* (no snapshots), *simple* (take snapshots at the end of each request) and *detailed* (take snap shots every 10 blocks when requests are executing). Note: snapshoting will take a snapshot, then run GC, then take a snapshot again. *detailed* should not be used in any scenario where you are measuring timings
- **Default**: none
- `block_diagnostics` - should we output detailed timings for block operations - blocks queued on the responder, blocks sent out on the network from the responder, responses received on the requestor, and blocks processed on the requestor
- **Default**: false
......@@ -21,8 +21,10 @@ require (
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
github.com/kr/text v0.2.0 // indirect
github.com/libp2p/go-libp2p v0.12.0
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-core v0.8.0
github.com/libp2p/go-libp2p-gostream v0.3.1
github.com/libp2p/go-libp2p-http v0.2.0
github.com/libp2p/go-libp2p-noise v0.1.1
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-tls v0.1.3
......
......@@ -347,6 +347,8 @@ github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniV
github.com/libp2p/go-libp2p v0.8.1/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o=
github.com/libp2p/go-libp2p v0.12.0 h1:+xai9RQnQ9l5elFOKvp5wRyjyWisSwEx+6nU2+onpUA=
github.com/libp2p/go-libp2p v0.12.0/go.mod h1:FpHZrfC1q7nA8jitvdjKBDF31hguaC676g/nT9PgQM0=
github.com/libp2p/go-libp2p v0.13.0 h1:tDdrXARSghmusdm0nf1U/4M8aj8Rr0V2IzQOXmbzQ3s=
github.com/libp2p/go-libp2p v0.13.0/go.mod h1:pM0beYdACRfHO1WcJlp65WXyG2A6NqYM+t2DTVAJxMo=
github.com/libp2p/go-libp2p-autonat v0.1.0/go.mod h1:1tLf2yXxiE/oKGtDwPYWTSYG3PtvYlJmg7NeVtPRqH8=
github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE=
github.com/libp2p/go-libp2p-autonat v0.2.0/go.mod h1:DX+9teU4pEEoZUqR1PiMlqliONQdNbfzE1C718tcViI=
......@@ -383,6 +385,8 @@ github.com/libp2p/go-libp2p-core v0.6.0 h1:u03qofNYTBN+yVg08PuAKylZogVf0xcTEeM8s
github.com/libp2p/go-libp2p-core v0.6.0/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX0bJvM49Ykaswo=
github.com/libp2p/go-libp2p-core v0.7.0 h1:4a0TMjrWNTZlNvcqxZmrMRDi/NQWrhwO2pkTuLSQ/IQ=
github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.0 h1:5K3mT+64qDTKbV3yTdbMCzJ7O6wbNsavAEb8iqBvBcI=
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
......@@ -390,6 +394,11 @@ github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfx
github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw=
github.com/libp2p/go-libp2p-discovery v0.5.0 h1:Qfl+e5+lfDgwdrXdu4YNCWyEo3fWuP+WgN9mN0iWviQ=
github.com/libp2p/go-libp2p-discovery v0.5.0/go.mod h1:+srtPIU9gDaBNu//UHvcdliKBIcr4SfDcm0/PfPJLug=
github.com/libp2p/go-libp2p-gostream v0.3.0/go.mod h1:pLBQu8db7vBMNINGsAwLL/ZCE8wng5V1FThoaE5rNjc=
github.com/libp2p/go-libp2p-gostream v0.3.1 h1:XlwohsPn6uopGluEWs1Csv1QCEjrTXf2ZQagzZ5paAg=
github.com/libp2p/go-libp2p-gostream v0.3.1/go.mod h1:1V3b+u4Zhaq407UUY9JLCpboaeufAeVQbnvAt12LRsI=
github.com/libp2p/go-libp2p-http v0.2.0 h1:GYeVd+RZzkRa8XFLITqOpcrIQG6KbFLPJqII6HHBHzY=
github.com/libp2p/go-libp2p-http v0.2.0/go.mod h1:GlNKFqDZHe25LVy2CvnZKx75/jLtMaD3VxZV6N39X7E=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
......@@ -399,6 +408,9 @@ github.com/libp2p/go-libp2p-mplex v0.2.3 h1:2zijwaJvpdesST2MXpI5w9wWFRgYtMcpRX7r
github.com/libp2p/go-libp2p-mplex v0.2.3/go.mod h1:CK3p2+9qH9x+7ER/gWWDYJ3QW5ZxWDkm+dVvjfuG3ek=
github.com/libp2p/go-libp2p-mplex v0.3.0 h1:CZyqqKP0BSGQyPLvpRQougbfXaaaJZdGgzhCpJNuNSk=
github.com/libp2p/go-libp2p-mplex v0.3.0/go.mod h1:l9QWxRbbb5/hQMECEb908GbS9Sm2UAR2KFZKUJEynEs=
github.com/libp2p/go-libp2p-mplex v0.4.0/go.mod h1:yCyWJE2sc6TBTnFpjvLuEJgTSw/u+MamvzILKdX7asw=
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-nat v0.0.4/go.mod h1:N9Js/zVtAXqaeT99cXgTV9e75KpnWCvVOiGzlcHmBbY=
github.com/libp2p/go-libp2p-nat v0.0.5/go.mod h1:1qubaE5bTZMJE+E/uu2URroMbzdubFz1ChgiN79yKPE=
github.com/libp2p/go-libp2p-nat v0.0.6 h1:wMWis3kYynCbHoyKLPBEMu4YRLltbm8Mk08HGSfvTkU=
......@@ -433,6 +445,8 @@ github.com/libp2p/go-libp2p-swarm v0.2.8/go.mod h1:JQKMGSth4SMqonruY0a8yjlPVIkb0
github.com/libp2p/go-libp2p-swarm v0.3.0/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk=
github.com/libp2p/go-libp2p-swarm v0.3.1 h1:UTobu+oQHGdXTOGpZ4RefuVqYoJXcT0EBtSR74m2LkI=
github.com/libp2p/go-libp2p-swarm v0.3.1/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk=
github.com/libp2p/go-libp2p-swarm v0.4.0 h1:hahq/ijRoeH6dgROOM8x7SeaKK5VgjjIr96vdrT+NUA=
github.com/libp2p/go-libp2p-swarm v0.4.0/go.mod h1:XVFcO52VoLoo0eitSxNQWYq4D6sydGOweTOAjJNraCw=
github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
......@@ -441,12 +455,15 @@ github.com/libp2p/go-libp2p-testing v0.1.1/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eq
github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8/go.mod h1:Qy8sAncLKpwXtS2dSnDOP8ktexIAHKu+J+pnZOFZLTc=
github.com/libp2p/go-libp2p-testing v0.3.0 h1:ZiBYstPamsi7y6NJZebRudUzsYmVkt998hltyLqf8+g=
github.com/libp2p/go-libp2p-testing v0.3.0/go.mod h1:efZkql4UZ7OVsEfaxNHZPzIehtsBXMrXnCfJIgDti5g=
github.com/libp2p/go-libp2p-testing v0.4.0/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotlKsNSbKQ/lImlOWF0=
github.com/libp2p/go-libp2p-tls v0.1.3 h1:twKMhMu44jQO+HgQK9X8NHO5HkeJu2QbhLzLJpa8oNM=
github.com/libp2p/go-libp2p-tls v0.1.3/go.mod h1:wZfuewxOndz5RTnCAxFliGjvYSDA40sKitV4c50uI1M=
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1/go.mod h1:IEtA6or8JUbsV07qPW4r01GnTenLW4oi3lOPbUMGJJA=
github.com/libp2p/go-libp2p-transport-upgrader v0.2.0/go.mod h1:mQcrHj4asu6ArfSoMuyojOdjx73Q47cYD7s5+gZOlns=
github.com/libp2p/go-libp2p-transport-upgrader v0.3.0 h1:q3ULhsknEQ34eVDhv4YwKS8iet69ffs9+Fir6a7weN4=
github.com/libp2p/go-libp2p-transport-upgrader v0.3.0/go.mod h1:i+SKzbRnvXdVbU3D1dwydnTmKRPXiAR/fyvi1dXuL4o=
github.com/libp2p/go-libp2p-transport-upgrader v0.4.0 h1:xwj4h3hJdBrxqMOyMUjwscjoVst0AASTsKtZiTChoHI=
github.com/libp2p/go-libp2p-transport-upgrader v0.4.0/go.mod h1:J4ko0ObtZSmgn5BX5AmegP+dK3CSnU2lMCKsSq/EY0s=
github.com/libp2p/go-libp2p-yamux v0.2.0/go.mod h1:Db2gU+XfLpm6E4rG5uGCFX6uXA8MEXOxFcRoXUODaK8=
github.com/libp2p/go-libp2p-yamux v0.2.1/go.mod h1:1FBXiHDk1VyRM1C0aez2bCfHQ4vMZKkAQzZbkSQt5fI=
github.com/libp2p/go-libp2p-yamux v0.2.2/go.mod h1:lIohaR0pT6mOt0AZ0L2dFze9hds9Req3OfS+B+dv4qw=
......@@ -456,6 +473,9 @@ github.com/libp2p/go-libp2p-yamux v0.2.8 h1:0s3ELSLu2O7hWKfX1YjzudBKCP0kZ+m9e2+0
github.com/libp2p/go-libp2p-yamux v0.2.8/go.mod h1:/t6tDqeuZf0INZMTgd0WxIRbtK2EzI2h7HbFm9eAKI4=
github.com/libp2p/go-libp2p-yamux v0.4.0 h1:qunEZzWwwmfSBYTtSyd81PlD1TjB5uuWcGYHWVXLbUg=
github.com/libp2p/go-libp2p-yamux v0.4.0/go.mod h1:+DWDjtFMzoAwYLVkNZftoucn7PelNoy5nm3tZ3/Zw30=
github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po=
github.com/libp2p/go-libp2p-yamux v0.5.1 h1:sX4WQPHMhRxJE5UZTfjEuBvlQWXB5Bo3A2JK9ZJ9EM0=
github.com/libp2p/go-libp2p-yamux v0.5.1/go.mod h1:dowuvDu8CRWmr0iqySMiSxK+W0iL5cMVO9S94Y6gkv4=
github.com/libp2p/go-maddr-filter v0.0.4/go.mod h1:6eT12kSQMA9x2pvFQa+xesMKUBlj9VImZbj3B9FBH/Q=
github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDRp/w6ENFaC9bG+M=
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
......@@ -466,6 +486,8 @@ github.com/libp2p/go-mplex v0.1.2 h1:qOg1s+WdGLlpkrczDqmhYzyk3vCfsQ8+RxRTQjOZWwI
github.com/libp2p/go-mplex v0.1.2/go.mod h1:Xgz2RDCi3co0LeZfgjm4OgUF15+sVR8SRcu3SFXI1lk=
github.com/libp2p/go-mplex v0.2.0 h1:Ov/D+8oBlbRkjBs1R1Iua8hJ8cUfbdiW8EOdZuxcgaI=
github.com/libp2p/go-mplex v0.2.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ=
github.com/libp2p/go-mplex v0.3.0 h1:U1T+vmCYJaEoDJPV1aq31N56hS+lJgb397GsylNSgrU=
github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ=
github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-msgio v0.0.3/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA=
......@@ -514,6 +536,8 @@ github.com/libp2p/go-ws-transport v0.2.0/go.mod h1:9BHJz/4Q5A9ludYWKoGCFC5gUElzl
github.com/libp2p/go-ws-transport v0.3.0/go.mod h1:bpgTJmRZAvVHrgHybCVyqoBmyLQ1fiZuEaBYusP5zsk=
github.com/libp2p/go-ws-transport v0.3.1 h1:ZX5rWB8nhRRJVaPO6tmkGI/Xx8XNboYX20PW5hXIscw=
github.com/libp2p/go-ws-transport v0.3.1/go.mod h1:bpgTJmRZAvVHrgHybCVyqoBmyLQ1fiZuEaBYusP5zsk=
github.com/libp2p/go-ws-transport v0.4.0 h1:9tvtQ9xbws6cA5LvqdE6Ne3vcmGB4f1z9SByggk4s0k=
github.com/libp2p/go-ws-transport v0.4.0/go.mod h1:EcIEKqf/7GDjth6ksuS/6p7R49V4CBY6/E7R/iyhYUA=
github.com/libp2p/go-yamux v1.2.2/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow=
github.com/libp2p/go-yamux v1.2.3/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow=
github.com/libp2p/go-yamux v1.3.0/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow=
......@@ -523,6 +547,10 @@ github.com/libp2p/go-yamux v1.3.7 h1:v40A1eSPJDIZwz2AvrV3cxpTZEGDP11QJbukmEhYyQI
github.com/libp2p/go-yamux v1.3.7/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE=
github.com/libp2p/go-yamux v1.4.0 h1:7nqe0T95T2CWh40IdJ/tp8RMor4ubc9/wYZpB2a/Hx0=
github.com/libp2p/go-yamux v1.4.0/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE=
github.com/libp2p/go-yamux v1.4.1 h1:P1Fe9vF4th5JOxxgQvfbOHkrGqIZniTLf+ddhZp8YTI=
github.com/libp2p/go-yamux v1.4.1/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE=
github.com/libp2p/go-yamux/v2 v2.0.0 h1:vSGhAy5u6iHBq11ZDcyHH4Blcf9xlBhT4WQDoOE90LU=
github.com/libp2p/go-yamux/v2 v2.0.0/go.mod h1:NVWira5+sVUIU6tu1JWvaRn1dRnG+cawOJiflsAM+7U=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
......
......@@ -38,6 +38,8 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/peer"
gostream "github.com/libp2p/go-libp2p-gostream"
p2phttp "github.com/libp2p/go-libp2p-http"
noise "github.com/libp2p/go-libp2p-noise"
secio "github.com/libp2p/go-libp2p-secio"
tls "github.com/libp2p/go-libp2p-tls"
......@@ -127,10 +129,11 @@ func (p networkParams) String() string {
func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
var (
size = runenv.SizeParam("size")
concurrency = runenv.IntParam("concurrency")
networkParams = parseNetworkConfig(runenv)
memorySnapshots = parseMemorySnapshotsParam(runenv)
size = runenv.SizeParam("size")
concurrency = runenv.IntParam("concurrency")
blockDiagnostics = runenv.BooleanParam("block_diagnostics")
networkParams = parseNetworkConfig(runenv)
memorySnapshots = parseMemorySnapshotsParam(runenv)
)
runenv.RecordMessage("started test instance")
runenv.RecordMessage("network params: %v", networkParams)
......@@ -151,6 +154,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
maxMemoryPerPeer := runenv.SizeParam("max_memory_per_peer")
maxMemoryTotal := runenv.SizeParam("max_memory_total")
maxInProgressRequests := runenv.IntParam("max_in_progress_requests")
var (
// make datastore, blockstore, dag service, graphsync
bs = blockstore.NewBlockstore(dss.MutexWrap(datastore))
......@@ -161,10 +165,81 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
storeutil.StorerForBlockstore(bs),
gsi.MaxMemoryPerPeerResponder(maxMemoryPerPeer),
gsi.MaxMemoryResponder(maxMemoryTotal),
gsi.MaxInProgressRequests(uint64(maxInProgressRequests)),
)
recorder = &runRecorder{memorySnapshots: memorySnapshots, runenv: runenv}
recorder = &runRecorder{memorySnapshots: memorySnapshots, blockDiagnostics: blockDiagnostics, runenv: runenv}
)
startTimes := make(map[struct {
peer.ID
gs.RequestID
}]time.Time)
var startTimesLk gosync.RWMutex
gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) {
hookActions.ValidateRequest()
startTimesLk.Lock()
startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}] = time.Now()
startTimesLk.Unlock()
})
gsync.RegisterOutgoingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.OutgoingRequestHookActions) {
startTimesLk.Lock()
startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}] = time.Now()
startTimesLk.Unlock()
})
gsync.RegisterCompletedResponseListener(func(p peer.ID, request gs.RequestData, status gs.ResponseStatusCode) {
startTimesLk.RLock()
startTime, ok := startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}]
startTimesLk.RUnlock()
if ok && status == gs.RequestCompletedFull {
duration := time.Since(startTime)
recorder.recordRun(duration)
}
})
gsync.RegisterOutgoingBlockHook(func(p peer.ID, request gs.RequestData, block gs.BlockData, ha gs.OutgoingBlockHookActions) {
startTimesLk.RLock()
startTime := startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}]
startTimesLk.RUnlock()
recorder.recordBlockQueued(fmt.Sprintf("for request %d at %s", request.ID(), time.Since(startTime)))
})
gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) {
startTimesLk.RLock()
startTime := startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}]
startTimesLk.RUnlock()
recorder.recordBlock(fmt.Sprintf("sent for request %d at %s", request.ID(), time.Since(startTime)))
})
gsync.RegisterIncomingResponseHook(func(p peer.ID, response gs.ResponseData, actions gs.IncomingResponseHookActions) {
startTimesLk.RLock()
startTime := startTimes[struct {
peer.ID
gs.RequestID
}{p, response.RequestID()}]
startTimesLk.RUnlock()
recorder.recordResponse(fmt.Sprintf("for request %d at %s", response.RequestID(), time.Since(startTime)))
})
gsync.RegisterIncomingBlockHook(func(p peer.ID, response gs.ResponseData, block gs.BlockData, ha gs.IncomingBlockHookActions) {
startTimesLk.RLock()
startTime := startTimes[struct {
peer.ID
gs.RequestID
}{p, response.RequestID()}]
startTimesLk.RUnlock()
recorder.recordBlock(fmt.Sprintf("processed for request %d, cid %s, at %s", response.RequestID(), block.Link().String(), time.Since(startTime)))
})
defer initCtx.SyncClient.MustSignalAndWait(ctx, "done", runenv.TestInstanceCount)
switch runenv.TestGroupID {
......@@ -175,37 +250,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
runenv.RecordMessage("we are the provider")
defer runenv.RecordMessage("done provider")
startTimes := make(map[struct {
peer.ID
gs.RequestID
}]time.Time)
var startTimesLk gosync.Mutex
gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) {
hookActions.ValidateRequest()
startTimesLk.Lock()
startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}] = time.Now()
startTimesLk.Unlock()
})
gsync.RegisterCompletedResponseListener(func(p peer.ID, request gs.RequestData, status gs.ResponseStatusCode) {
startTimesLk.Lock()
startTime, ok := startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}]
startTimesLk.Unlock()
if ok && status == gs.RequestCompletedFull {
duration := time.Since(startTime)
recorder.recordRun(duration)
}
})
gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) {
recorder.recordBlock()
})
err := runProvider(ctx, runenv, initCtx, dagsrv, size, ip, networkParams, concurrency, memorySnapshots, recorder)
err := runProvider(ctx, runenv, initCtx, dagsrv, size, host, ip, networkParams, concurrency, memorySnapshots, recorder)
if err != nil {
runenv.RecordMessage("Error running provider: %s", err.Error())
}
......@@ -213,16 +258,13 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
case "requestors":
runenv.RecordMessage("we are the requestor")
defer runenv.RecordMessage("done requestor")
gsync.RegisterIncomingBlockHook(func(p peer.ID, request gs.ResponseData, block gs.BlockData, ha gs.IncomingBlockHookActions) {
recorder.recordBlock()
})
p := peers[0]
if err := host.Connect(ctx, *p.peerAddr); err != nil {
return err
}
runenv.RecordMessage("done dialling provider")
return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency, size, memorySnapshots, recorder)
return runRequestor(ctx, runenv, initCtx, gsync, host, p, dagsrv, networkParams, concurrency, size, memorySnapshots, recorder)
default:
panic(fmt.Sprintf("unsupported group ID: %s\n", runenv.TestGroupID))
......@@ -292,7 +334,7 @@ func parseMemorySnapshotsParam(runenv *runtime.RunEnv) snapshotMode {
}
}
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p *AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error {
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, h host.Host, p *AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
// create a selector for the whole UnixFS dag
......@@ -300,7 +342,17 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
)
runHTTPTest := runenv.BooleanParam("compare_http")
useLibP2p := runenv.BooleanParam("use_libp2p_http")
var client *http.Client
if runHTTPTest {
if useLibP2p {
tr := &http.Transport{}
tr.RegisterProtocol("libp2p", p2phttp.NewTransport(h))
client = &http.Client{Transport: tr}
} else {
client = http.DefaultClient
}
}
for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
......@@ -359,7 +411,13 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
if runHTTPTest {
// request file directly over http
start = time.Now()
resp, err := http.Get(fmt.Sprintf("http://%s:8080/%s", p.ip.String(), c.String()))
var resp *http.Response
var err error
if useLibP2p {
resp, err = client.Get(fmt.Sprintf("libp2p://%s/%s", p.peerAddr.ID.String(), c.String()))
} else {
resp, err = client.Get(fmt.Sprintf("http://%s:8080/%s", p.ip.String(), c.String()))
}
if err != nil {
panic(err)
}
......@@ -389,24 +447,37 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
return nil
}
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, ip net.IP, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error {
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, h host.Host, ip net.IP, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
bufferedDS = format.NewBufferedDAG(ctx, dagsrv)
)
runHTTPTest := runenv.BooleanParam("compare_http")
useLibP2p := runenv.BooleanParam("use_libp2p_http")
var svr *http.Server
if runHTTPTest {
// start an http server on port 8080
runenv.RecordMessage("creating http server at http://%s:8080", ip.String())
svr = &http.Server{Addr: ":8080"}
go func() {
if err := svr.ListenAndServe(); err != nil {
runenv.RecordMessage("shutdown http server at http://%s:8080", ip.String())
}
}()
if useLibP2p {
listener, _ := gostream.Listen(h, p2phttp.DefaultP2PProtocol)
defer listener.Close()
// start an http server on port 8080
runenv.RecordMessage("creating http server at libp2p://%s", h.ID().String())
svr = &http.Server{}
go func() {
if err := svr.Serve(listener); err != nil {
runenv.RecordMessage("shutdown http server at libp2p://%s", h.ID().String())
}
}()
} else {
runenv.RecordMessage("creating http server at http://%s:8080", ip.String())
svr = &http.Server{Addr: ":8080"}
go func() {
if err := svr.ListenAndServe(); err != nil {
runenv.RecordMessage("shutdown http server at http://%s:8080", ip.String())
}
}()
}
}
for round, np := range networkParams {
......@@ -657,17 +728,23 @@ func writeHeap(runenv *runtime.RunEnv, size uint64, np networkParams, concurrenc
}
type runRecorder struct {
memorySnapshots snapshotMode
index int
np networkParams
size uint64
concurrency int
round int
runenv *runtime.RunEnv
measurement string
memorySnapshots snapshotMode
blockDiagnostics bool
index int
queuedIndex int
responseIndex int
np networkParams
size uint64
concurrency int
round int
runenv *runtime.RunEnv
measurement string
}
func (rr *runRecorder) recordBlock() {
func (rr *runRecorder) recordBlock(postfix string) {
if rr.blockDiagnostics {
rr.runenv.RecordMessage("block %d %s", rr.index, postfix)
}
if rr.memorySnapshots == snapshotDetailed {
if rr.index%detailedSnapshotFrequency == 0 {
recordSnapshots(rr.runenv, rr.size, rr.np, rr.concurrency, fmt.Sprintf("incremental-%d", rr.index))
......@@ -676,6 +753,20 @@ func (rr *runRecorder) recordBlock() {
rr.index++
}
func (rr *runRecorder) recordBlockQueued(postfix string) {
if rr.blockDiagnostics {
rr.runenv.RecordMessage("block %d queued %s", rr.queuedIndex, postfix)
}
rr.queuedIndex++
}
func (rr *runRecorder) recordResponse(postfix string) {
if rr.blockDiagnostics {
rr.runenv.RecordMessage("response %d received %s", rr.responseIndex, postfix)
}
rr.responseIndex++
}
func (rr *runRecorder) recordRun(duration time.Duration) {
rr.runenv.RecordMessage("\t<<< graphsync request complete with no errors")
rr.runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", rr.round, rr.np.latency, rr.np.bandwidth, duration)
......
......@@ -28,7 +28,10 @@ raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", d
disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"}
memory_snapshots = { type = "string", desc = "what kind of memory snapshots to take (none, simple, detailed)", default = "none" }
compare_http = { type = "bool", desc = "run a comparison against http", default = "true"}
max_memory_per_peer = { type = "int", desc = "max memory a responder can queue up per peer", default = "64MiB"}
max_memory_total = { type = "int", desc = "max memory a responder can queue up total", default = "512MiB"}
unlimited_bandwidth_case = { type = "bool", desc = "run a comparison against http", default = "true"}
no_latency_case = { type = "bool", desc = "run a comparison against http", default = "true"}
\ No newline at end of file
max_memory_per_peer = { type = "int", desc = "max memory a responder can queue up per peer", default = "16MiB"}
max_memory_total = { type = "int", desc = "max memory a responder can queue up total", default = "256MiB"}
unlimited_bandwidth_case = { type = "bool", desc = "disable unlimited bandwidth test case", default = "true"}
no_latency_case = { type = "bool", desc = "disable no latency test case", default = "true"}
block_diagnostics = { type = "bool", desc = "output records of block times", default = "false" }
use_libp2p_http = { type = "bool", desc = "use libp2p over http for http comparison", default = "false"}
max_in_progress_requests = { type = "int", desc = "max requests processed by a responder at once", default = "6" }
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment